Skip to content

Commit

Permalink
feat: use worker-rpc library for inter-process communication (#246)
Browse files Browse the repository at this point in the history
* add worker-rpc for inter-process-communication (use signal-based messaging for minimal code changes)
* use rpc calls instead of signaling
  • Loading branch information
phryneas authored and piotr-oles committed Apr 17, 2019
1 parent 6bdf632 commit 4817f01
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 203 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@
"micromatch": "^3.1.10",
"minimatch": "^3.0.4",
"semver": "^5.6.0",
"tapable": "^1.0.0"
"tapable": "^1.0.0",
"worker-rpc": "^0.1.0"
},
"husky": {
"hooks": {
Expand Down
2 changes: 1 addition & 1 deletion src/CancellationToken.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import * as ts from 'typescript'; // Imported for types alone

import { FsHelper } from './FsHelper';

interface CancellationTokenData {
export interface CancellationTokenData {
isCancelled: boolean;
cancellationFileName: string;
}
Expand Down
9 changes: 9 additions & 0 deletions src/RpcTypes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { CancellationTokenData } from './CancellationToken';
import { Message } from './Message';

export const RUN = 'run';
export type RunPayload = CancellationTokenData;
export type RunResult =
| Message
// when run was cancelled via CancellationToken, undefined is returned
| undefined;
54 changes: 0 additions & 54 deletions src/WorkResult.ts

This file was deleted.

83 changes: 47 additions & 36 deletions src/cluster.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import * as childProcess from 'child_process';
import * as path from 'path';
import * as process from 'process';
import { RpcProvider } from 'worker-rpc';

import { WorkResult } from './WorkResult';
import { NormalizedMessage } from './NormalizedMessage';
import { Message } from './Message';
import { RunPayload, RunResult, RUN } from './RpcTypes';

// fork workers...
const division = parseInt(process.env.WORK_DIVISION || '', 10);
Expand All @@ -20,54 +21,64 @@ for (let num = 0; num < division; num++) {
);
}

const pids = workers.map(worker => worker.pid);
const result = new WorkResult(pids);
// communication with parent process
const parentRpc = new RpcProvider(message => {
try {
process.send!(message);
} catch (e) {
// channel closed...
process.exit();
}
});
process.on('message', message => parentRpc.dispatch(message));

process.on('message', (message: Message) => {
// broadcast message to all workers
workers.forEach(worker => {
// communication with worker processes
const workerRpcs = workers.map(worker => {
const rpc = new RpcProvider(message => {
try {
worker.send(message);
} catch (e) {
// channel closed - something went wrong - close cluster...
process.exit();
}
});

// clear previous result set
result.clear();
worker.on('message', message => rpc.dispatch(message));
return rpc;
});

// listen to all workers
workers.forEach(worker => {
worker.on('message', (message: Message) => {
// set result from worker
result.set(worker.pid, {
diagnostics: message.diagnostics.map(NormalizedMessage.createFromJSON),
lints: message.lints.map(NormalizedMessage.createFromJSON)
});
parentRpc.registerRpcHandler<RunPayload, RunResult>(RUN, async message => {
const workerResults = await Promise.all(
workerRpcs.map(workerRpc =>
workerRpc.rpc<RunPayload, RunResult>(RUN, message)
)
);

// if we have result from all workers, send merged
if (result.hasAll()) {
const merged: Message = result.reduce(
(innerMerged: Message, innerResult: Message) => ({
diagnostics: innerMerged.diagnostics.concat(innerResult.diagnostics),
lints: innerMerged.lints.concat(innerResult.lints)
}),
{ diagnostics: [], lints: [] }
);
function workerFinished(
workerResult: (Message | undefined)[]
): workerResult is Message[] {
return workerResult.every(result => typeof result !== 'undefined');
}

merged.diagnostics = NormalizedMessage.deduplicate(merged.diagnostics);
merged.lints = NormalizedMessage.deduplicate(merged.lints);
if (!workerFinished(workerResults)) {
return undefined;
}

try {
process.send!(merged);
} catch (e) {
// channel closed...
process.exit();
}
}
});
const merged: Message = workerResults.reduce(
(innerMerged: Message, innerResult: Message) => ({
diagnostics: innerMerged.diagnostics.concat(
innerResult.diagnostics.map(NormalizedMessage.createFromJSON)
),
lints: innerMerged.lints.concat(
innerResult.lints.map(NormalizedMessage.createFromJSON)
)
}),
{ diagnostics: [], lints: [] }
);

merged.diagnostics = NormalizedMessage.deduplicate(merged.diagnostics);
merged.lints = NormalizedMessage.deduplicate(merged.lints);

return merged;
});

process.on('SIGINT', () => {
Expand Down
27 changes: 22 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as path from 'path';
import * as process from 'process';
import * as childProcess from 'child_process';
import { RpcProvider } from 'worker-rpc';
import * as semver from 'semver';
import chalk, { Chalk } from 'chalk';
import * as micromatch from 'micromatch';
Expand All @@ -17,6 +18,7 @@ import { FsHelper } from './FsHelper';
import { Message } from './Message';

import { getForkTsCheckerWebpackPluginHooks, legacyHookMap } from './hooks';
import { RunPayload, RunResult, RUN } from './RpcTypes';

const checkerPluginName = 'fork-ts-checker-webpack-plugin';

Expand Down Expand Up @@ -121,6 +123,7 @@ class ForkTsCheckerWebpackPlugin {
private tslintVersion: string;

private service?: childProcess.ChildProcess;
private serviceRpc?: RpcProvider;

private vue: boolean;

Expand Down Expand Up @@ -395,7 +398,14 @@ class ForkTsCheckerWebpackPlugin {
if (this.measureTime) {
this.startAt = this.performance.now();
}
this.service!.send(this.cancellationToken);
this.serviceRpc!.rpc<RunPayload, RunResult>(
RUN,
this.cancellationToken.toJSON()
).then(result => {
if (result) {
this.handleServiceMessage(result);
}
});
} catch (error) {
if (!this.silent && this.logger) {
this.logger.error(
Expand Down Expand Up @@ -440,7 +450,14 @@ class ForkTsCheckerWebpackPlugin {
}

try {
this.service!.send(this.cancellationToken);
this.serviceRpc!.rpc<RunPayload, RunResult>(
RUN,
this.cancellationToken.toJSON()
).then(result => {
if (result) {
this.handleServiceMessage(result);
}
});
} catch (error) {
if (!this.silent && this.logger) {
this.logger.error(
Expand Down Expand Up @@ -578,6 +595,8 @@ class ForkTsCheckerWebpackPlugin {
stdio: ['inherit', 'inherit', 'inherit', 'ipc']
}
);
this.serviceRpc = new RpcProvider(message => this.service!.send(message));
this.service.on('message', message => this.serviceRpc!.dispatch(message));

if ('hooks' in this.compiler) {
// webpack 4+
Expand Down Expand Up @@ -630,9 +649,6 @@ class ForkTsCheckerWebpackPlugin {
}
}

this.service.on('message', (message: Message) =>
this.handleServiceMessage(message)
);
this.service.on('exit', (code: string | number, signal: string) =>
this.handleServiceExit(code, signal)
);
Expand All @@ -649,6 +665,7 @@ class ForkTsCheckerWebpackPlugin {

this.service.kill();
this.service = undefined;
this.serviceRpc = undefined;
} catch (e) {
if (this.logger && !this.silent) {
this.logger.error(e);
Expand Down
39 changes: 25 additions & 14 deletions src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ import {
makeCreateNormalizedMessageFromDiagnostic,
makeCreateNormalizedMessageFromRuleFailure
} from './NormalizedMessageFactories';
import { RpcProvider } from 'worker-rpc';
import { RunPayload, RunResult, RUN } from './RpcTypes';

const rpc = new RpcProvider(message => {
try {
process.send!(message);
} catch (e) {
// channel closed...
process.exit();
}
});
process.on('message', message => rpc.dispatch(message));

const typescript: typeof ts = require(process.env.TYPESCRIPT_PATH!);

Expand Down Expand Up @@ -61,28 +73,27 @@ async function run(cancellationToken: CancellationToken) {
}
} catch (error) {
if (error instanceof typescript.OperationCanceledException) {
return;
return undefined;
}

throw error;
}

if (!cancellationToken.isCancellationRequested()) {
try {
process.send!({
diagnostics,
lints
});
} catch (e) {
// channel closed...
process.exit();
}
if (cancellationToken.isCancellationRequested()) {
return undefined;
}

return {
diagnostics,
lints
};
}

process.on('message', message => {
run(CancellationToken.createFromJSON(typescript, message));
});
rpc.registerRpcHandler<RunPayload, RunResult>(RUN, message =>
typeof message !== 'undefined'
? run(CancellationToken.createFromJSON(typescript, message!))
: undefined
);

process.on('SIGINT', () => {
process.exit();
Expand Down
Loading

0 comments on commit 4817f01

Please sign in to comment.