Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WebWorker Thread Support for Running Tasks #348

Merged
merged 17 commits into from
Feb 22, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/frontend/.env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ NEBULA_ENV=development
NEBULA_ENV_SHOW_DEVTOOLS=true
NEBULA_START_URL=https://localhost:3000
# NEBULA_RUNNER_MOCK_SPECIAL_PARSER="DSM US"
NEBULA_RUNNER_CONCURRENCY_TYPE=process
NEBULA_RUNNER_CONCURRENCY_TYPE=thread
pr1sm marked this conversation as resolved.
Show resolved Hide resolved
18 changes: 17 additions & 1 deletion packages/frontend/lib/task/adapter.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
// eslint-disable-next-line import/no-extraneous-dependencies
const { ipcRenderer } = require('electron');
const { TaskManager, TaskProcessManager } = require('@nebula/task-runner').shopify;
const {
TaskManager,
TaskThreadManager,
TaskProcessManager,
} = require('@nebula/task-runner').shopify;

const IPCKeys = require('../common/constants');
const nebulaEnv = require('../_electron/env');
Expand All @@ -21,6 +25,18 @@ class TaskManagerAdapter {
this._taskManager = new TaskProcessManager(logPath);
break;
}
case 'thread': {
if (global.window.Worker) {
pr1sm marked this conversation as resolved.
Show resolved Hide resolved
pr1sm marked this conversation as resolved.
Show resolved Hide resolved
this._taskManager = new TaskThreadManager(logPath);
} else {
pr1sm marked this conversation as resolved.
Show resolved Hide resolved
console.log(
'[WARNING]: Worker Thread are not supported in this environment! Falling back to multi-process manager...',
);
this._taskManager = new TaskProcessManager(logPath);
}

break;
}
default: {
// Use multiprocess TaskManager as default
this._taskManager = new TaskProcessManager(logPath);
Expand Down
2 changes: 2 additions & 0 deletions packages/task-runner/src/shopify/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
const TaskManager = require('./taskManager');
const TaskThreadManager = require('./taskThreadManager');
const TaskProcessManager = require('./taskProcessManager');
const TaskRunner = require('./taskRunner');

module.exports = {
TaskManager,
TaskThreadManager,
TaskProcessManager,
TaskRunner,
};
6 changes: 3 additions & 3 deletions packages/task-runner/src/shopify/runnerProcess.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const TaskManagerEvents = constants.TaskManager.Events;
const TaskRunnerEvents = constants.TaskRunner.Events;

/**
* Notify the main process that and error occured
* Notify the main process that and error occurred
*
* @param {Error} error
*/
Expand All @@ -38,7 +38,7 @@ function wireErrorHandlers() {
function wireEventHandlers(runner) {
// Handle Incoming Process Events by calling the correct runner methods
process.on('message', ({ target, event, args }) => {
// Only respond to events that are targetting the child
// Only respond to events that are targeting the child
if (target !== 'child') {
return;
}
Expand Down Expand Up @@ -110,7 +110,7 @@ async function _start([rId, task, proxy, loggerPath]) {

// Setup a handler to listen for the start message...
process.on('message', async ({ target, event, args }) => {
// Ensure target is child, and event correct
// Ensure target is child, and event is correct
if (target !== 'child' || event !== '__start') {
return;
}
Expand Down
124 changes: 124 additions & 0 deletions packages/task-runner/src/shopify/runnerWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/**
* This script is run in a worker thread by the TaskThreadManager
*
* The goal of this script is to wire events between the worker and main thread
* and act as a silent man-in-the-middle between the TaskManager and TaskRunner
*/
const constants = require('./classes/utils/constants');
const TaskRunner = require('./taskRunner');

const TaskManagerEvents = constants.TaskManager.Events;
const TaskRunnerEvents = constants.TaskRunner.Events;

/**
* Notify the main process that an error occurred
*
* @param {Error} error
*/
function forwardError(error) {
const { stack, message, filename, lineno } = error;
global.postMessage({
target: 'main',
event: '__error',
error: { stack, message, filename, lineno },
});
}

let errorHandlerWired = false;
function wireErrorHandler() {
if (errorHandlerWired) {
return;
}

// Attach Runner Handlers to Errors
global.onerror = forwardError;
errorHandlerWired = true;
}

function wireEventHandlers(runner) {
// Handle Incoming Process Events by calling the correct runner methods
global.onmessage = ({ data: { target, event, args } }) => {
// Only respond to events that are targeting the worker
if (target !== 'worker') {
return;
}

// Only Handle Certain Events
switch (event) {
case 'abort': {
pr1sm marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Respect the scope of Runner (issue #137)
runner._handleAbort(...args);
break;
}
case TaskManagerEvents.Harvest: {
// TODO: Respect the scope of Runner (issue #137)
runner._handleHarvest(...args);
break;
}
case TaskRunnerEvents.ReceiveProxy: {
// TODO: Respect the scope of Runner (issue #137)
runner._events.emit(TaskRunnerEvents.ReceiveProxy, ...args);
break;
}
case TaskManagerEvents.ChangeDelay: {
// TODO: Respect the scope of Runner (issue #137)
runner._events.emit(TaskManagerEvents.ChangeDelay, ...args);
break;
}
case TaskManagerEvents.UpdateHook: {
// TODO: Respect the scope of Runner (issue #137)
runner._events.emit(TaskManagerEvents.UpdateHook, ...args);
break;
}
default: {
break;
}
}
};

// Forward Runner Events to the Main Process
[
TaskRunnerEvents.TaskStatus,
TaskManagerEvents.StartHarvest,
TaskManagerEvents.StopHarvest,
TaskRunnerEvents.SwapProxy,
].forEach(event => {
// TODO: Respect the scope of Runner (issue #137)
runner._events.on(event, (...args) => {
global.postMessage({
target: 'main',
event,
args,
});
});
});
}

function cleanupEventHandlers(runner) {
// Detach runner event listeners
// TODO: Respect the scope of Runner (issue #137)
runner._events.removeAllListeners();
}

// Create the Runner, wire up events, run it, then cleanup events
async function _start([rId, task, proxy, loggerPath]) {
const runner = new TaskRunner(rId, task, proxy, loggerPath);
wireEventHandlers(runner);
await runner.start();
cleanupEventHandlers(runner);
}

// Setup an initial handler to listen for the start message...
global.onmessage = async ({ data: { target, event, args } }) => {
// Ensure target is worker, and event is correct
if (target !== 'worker' || event !== '__start') {
return;
}

wireErrorHandler();
await _start(args);
global.postMessage({
target: 'main',
event: '__done',
});
};
9 changes: 2 additions & 7 deletions packages/task-runner/src/shopify/taskProcessManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class TaskProcessManager extends TaskManager {
delete this._handlers[child.id];

// Remove child handler
child.removeListener('mesage', childHandler);
child.removeListener('message', childHandler);

// Remove manager event handlers
this._events.removeListener('abort', abort);
Expand Down Expand Up @@ -217,12 +217,7 @@ class TaskProcessManager extends TaskManager {
});
this._logger.info('Runner %s finished without errors', runnerId);
} catch (error) {
this._logger.error(
'Runner %s was stopped due to an errors: %s',
runnerId,
error.message,
error,
);
this._logger.error('Runner %s was stopped due to error: %s', runnerId, error.message, error);
}

// Remove the done handler first
Expand Down
Loading