Skip to content

Commit

Permalink
Add WebWorker Thread Support for Running Tasks (#348)
Browse files Browse the repository at this point in the history
* Implement TaskThreadManager

This commit implements a task manager variant that uses worker threads. A new worker is spawned for each new task that appears. All functionality should work, but hasn't been fully tested at this time.

The TaskThreadManager is added to the export list for use in the
frontend.

* Fix Typos in TaskProcessManager

THis commit fixes some various spelling typos found in the process manager files when implementing the task thread manager.

* Use Task Thread Manager

This commit updates the frontend to support using the task thread manager. The dev environment is adjusted to use it, but more testing should occur before it is used as the default.

Further, a check was added to make sure worker threads are supported before using the TaskThreadManager. If the are not supported a warning message is printed and the TaskProcessManager is used instead.

* Address PR Comments

This commit addresses changes requested in PR:
- TaskManager now has an Abort event defined. this is used instead of a string literal
- TaskThreadManager now properly handles aborting on close
- TaskThreadManager has changes to remove eslint errors
- TaskThreadManager now checks for Webworker compatibliity in constructor. An error is thrown if there is no compatibility
- TaskThreadManager and TaskProcessManager are refactored slightly to reduce code duplication
- TaskManagerAdapter has better logic for falling back when worker threads are unavailable

* Restructure Manager Files

This commit restructures manager files into their own folders. This helps visually group similar classes/files.

* Consolidate Shared Transformer Code

This commit consolidates shared code between the two scripts that ran on child contexts (either process or thread). Most the the code has been extracted out to a base class and sub classes have been created for each implementation.

* Refactor Managers to Consolidate Code

This commit refactors similar code from the TaskProcessManager and TaskThreadManager into a new SplitContextTaskManager. This class is used as a base class for all future task managers that run in split contexts. Subclasses for processes and worker threads have been implemented and are now used by the frontend.

* Fix SendProxy Bug

This commit fixes a bug that prevented tasks from starting due to a typo in the send proxy side effect handler.

* Fix Launcher Object Destroyed Bug

This commit fixes a bug due to improper handling of the launcher ipc tunnel. When the tunnel closes, the launcher now does not attempt to send any messages, which prevents a crash like this from occurring.

* Adjust TaskEvent Registration Logic

This commit adjusts the logic surrounding registering for task events.
Now the launcher will register for events as soon as the adapter is
ready. This prevents any possibility of the task event chain not being
set up properly to forward events to the frontend.

This commit also hides console logs so they only show in dev mode and
renames the manager.html file to a more appropriate launcher.html

* Start Launcher when Transitioning to the Auth State

This commit fixes a bug where deactivating the frontend would prevent the task launcher from starting again. A code path to launch the main window was missing the call to start the launcher as well.

* Add Linter Exemptions for Certain Classes

We have a requirement to use instance methods within the Transformer subclasses, but the implementations don't use `this`. Since this is a valid exception to the rule, an eslint comment is added to the top of both to convey this.

See https://eslint.org/docs/rules/class-methods-use-this#exceptions for more details

* Adjust Event Emitter Max Listeners

This commit adjusts the max listeners for the TaskManager and TaskRunner event listeners. This allows many tasks (>25) to run without producing memory leak warnings.

* Fix Delay and Webhook Event Passing

This commit updates the event handlers to allow responding to a bulk event. In this case the ID passed will be 'ALL'. This allows a single event to be emitted when adjusting a delay or webhook in the TaskManager for all tasks.

This also keeps the contract of purely using event based communication between the manager and runner.

* Refactor Default TaskManager Implementation

This commit updates the default single thread TaskManager Implementation to have similar syntax as the SplitContextTaskManager. This ensures all TaskManger implementations work when used.

* Switch Defaults to use SplitThreadTaskManger

This commit updates the default task manager to use the multi thread implementation. The set env var for prod is also updated to use thread instead of process.

fixes #192
  • Loading branch information
pr1sm authored Feb 22, 2019
1 parent 11c7c4c commit af8b3d6
Show file tree
Hide file tree
Showing 18 changed files with 807 additions and 450 deletions.
2 changes: 1 addition & 1 deletion packages/frontend/.env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ NEBULA_ENV=development
NEBULA_ENV_SHOW_DEVTOOLS=true
NEBULA_START_URL=https://localhost:3000
# NEBULA_RUNNER_MOCK_SPECIAL_PARSER="DSM US"
NEBULA_RUNNER_CONCURRENCY_TYPE=process
NEBULA_RUNNER_CONCURRENCY_TYPE=thread
2 changes: 1 addition & 1 deletion packages/frontend/.env.prod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Nebula Env Variables PROD
NEBULA_API_URL=https://nebula-orion-api.herokuapp.com
NEBULA_ENV=production
NEBULA_RUNNER_CONCURRENCY_TYPE=process
NEBULA_RUNNER_CONCURRENCY_TYPE=thread
1 change: 1 addition & 0 deletions packages/frontend/lib/_electron/windowManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ class WindowManager {
*/
async transitiontoAuthedState() {
this._main = await createMainWindow();
this._context.taskLauncher.start();
const winUrl = urls.get('main');
this._main.loadURL(winUrl);

Expand Down
20 changes: 16 additions & 4 deletions packages/frontend/lib/task/adapter.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
// eslint-disable-next-line import/no-extraneous-dependencies
const { ipcRenderer } = require('electron');
const { TaskManager, TaskProcessManager } = require('@nebula/task-runner').shopify;
const {
TaskManager,
SplitThreadTaskManager,
SplitProcessTaskManager,
} = require('@nebula/task-runner').shopify;

const IPCKeys = require('../common/constants');
const nebulaEnv = require('../_electron/env');
Expand All @@ -18,12 +22,20 @@ class TaskManagerAdapter {
break;
}
case 'process': {
this._taskManager = new TaskProcessManager(logPath);
this._taskManager = new SplitProcessTaskManager(logPath);
break;
}
case 'thread':
default: {
// Use multiprocess TaskManager as default
this._taskManager = new TaskProcessManager(logPath);
// Use multithread TaskManager as default if supported
try {
this._taskManager = new SplitThreadTaskManager(logPath);
} catch (_) {
console.log(
'[WARNING]: Worker Thread are not supported in this environment! Falling back to multi-process manager...',
);
this._taskManager = new SplitProcessTaskManager(logPath);
}
break;
}
}
Expand Down
File renamed without changes.
66 changes: 45 additions & 21 deletions packages/frontend/lib/task/launcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class TaskLauncher {
].forEach(key => {
context.ipc.on(key, (ev, ...params) => {
if (this._launcherWindow && ev.sender !== this._launcherWindow.webContents) {
this._launcherWindow.webContents.send(key, ...params);
this._sendToLauncher(key, ...params);
}
});
});
Expand Down Expand Up @@ -87,17 +87,23 @@ class TaskLauncher {
async start() {
const session = await this._context.authManager.getSession();
if (!session) {
console.log('Frontend is not authed! Skipping start...');
if (nebulaEnv.isDevelopment()) {
console.log('Frontend is not authed! Skipping start...');
}
return;
}

if (this._launcherWindow) {
console.log('Already Launched! skipping start...');
if (nebulaEnv.isDevelopment()) {
console.log('Already Launched! skipping start...');
}
return;
}

console.log('Launching...');
const managerUrl = `file:///${path.resolve(__dirname, 'manager.html')}`;
if (nebulaEnv.isDevelopment()) {
console.log('Launching...');
}
const managerUrl = `file:///${path.resolve(__dirname, 'launcher.html')}`;
this._launcherWindow = new Electron.BrowserWindow({
frame: false,
fullscreenable: false,
Expand Down Expand Up @@ -125,30 +131,49 @@ class TaskLauncher {
this._launcherWindow.loadURL(managerUrl);

this._launcherWindow.on('ready-to-show', () => {
this._launcherWindow.webContents.send('LOG_PATH', _LOG_PATH);
this._sendToLauncher('LOG_PATH', _LOG_PATH);
// open dev tools if dev env
if (nebulaEnv.isDevelopment() || process.env.NEBULA_ENV_SHOW_DEVTOOLS) {
console.log('Launcher Ready!');
this._launcherWindow.webContents.openDevTools();
}

// Start listening for events since we have at least one listener
this._sendToLauncher(IPCKeys.RegisterTaskEventHandler);
this._context.ipc.on(_TASK_EVENT_KEY, this._taskEventHandler);
});

this._launcherWindow.webContents.on('destroyed', () => {
// Remove launcher window reference if it gets destroyed by an outside source
this._launcherWindow = null;

// Remove the handler for listening to task event statuses
this._context.ipc.removeListener(_TASK_EVENT_KEY, this._taskEventHandler);
});

this._launcherWindow.on('close', () => {
if (nebulaEnv.isDevelopment()) {
console.log('Launcher Closed!');
}
this._launcherWindow = null;
});

console.log('Launched!');
if (nebulaEnv.isDevelopment()) {
console.log('Launched!');
}
}

async stop() {
if (!this._launcherWindow) {
console.log('launcher was already stopped');
if (nebulaEnv.isDevelopment()) {
console.log('launcher was already stopped');
}
return;
}

console.log('Closing Launcher...');
if (nebulaEnv.isDevelopment()) {
console.log('Closing Launcher...');
}
await this.abortAllTasks();
this._launcherWindow.close();
this._launcherWindow = null;
Expand All @@ -165,12 +190,21 @@ class TaskLauncher {
resolve();
}
});
this._launcherWindow.webContents.send(IPCKeys.RequestAbortAllTasksForClose);
this._sendToLauncher(IPCKeys.RequestAbortAllTasksForClose);
});
}

_sendToLauncher(channel, ...params) {
if (this._launcherWindow) {
this._launcherWindow.webContents.send(channel, ...params);
}
}

_taskEventHandler(_, taskId, statusMessage) {
this._eventListeners.forEach(l => l.send(_TASK_EVENT_KEY, taskId, statusMessage));
// forward event if we have listeners
if (this._eventListeners.length > 0) {
this._eventListeners.forEach(l => l.send(_TASK_EVENT_KEY, taskId, statusMessage));
}
}

_onRegisterEventRequest(event) {
Expand Down Expand Up @@ -221,11 +255,6 @@ class TaskLauncher {
return;
}
this._eventListeners.push(listener);
if (this._eventListeners.length === 1) {
// Start listening for events since we have at least one listener
this._launcherWindow.webContents.send(IPCKeys.RegisterTaskEventHandler);
this._context.ipc.on(_TASK_EVENT_KEY, this._taskEventHandler);
}
}

_removeEventListener(listener) {
Expand All @@ -234,11 +263,6 @@ class TaskLauncher {
return;
}
this._eventListeners = this._eventListeners.filter(l => l !== listener);
if (this._eventListeners.length === 0) {
// Stop listening for events since we don't have any listeners
this._launcherWindow.webContents.send(IPCKeys.DeregisterTaskEventHandler);
this._context.ipc.removeListener(_TASK_EVENT_KEY, this._taskEventHandler);
}
}

async _startHarvestEventHandler(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const TaskManagerEvents = {
Abort: 'ABORT',
StartHarvest: 'START_CAPTCHA_HARVEST',
StopHarvest: 'STOP_CAPTCHA_HARVEST',
Harvest: 'CAPTCHA_HARVEST',
Expand Down
8 changes: 5 additions & 3 deletions packages/task-runner/src/shopify/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
const TaskManager = require('./taskManager');
const TaskProcessManager = require('./taskProcessManager');
const TaskManager = require('./managers/taskManager');
const SplitThreadTaskManager = require('./managers/splitThreadTaskManager');
const SplitProcessTaskManager = require('./managers/splitProcessTaskManager');
const TaskRunner = require('./taskRunner');

module.exports = {
TaskManager,
TaskProcessManager,
SplitThreadTaskManager,
SplitProcessTaskManager,
TaskRunner,
};
Loading

0 comments on commit af8b3d6

Please sign in to comment.