diff --git a/packages/gatsby-worker/package.json b/packages/gatsby-worker/package.json index 0506893d028ab..f2616b8a08fc5 100644 --- a/packages/gatsby-worker/package.json +++ b/packages/gatsby-worker/package.json @@ -8,7 +8,9 @@ }, "dependencies": { "@babel/core": "^7.15.5", - "@babel/runtime": "^7.15.4" + "@babel/runtime": "^7.15.4", + "fs-extra": "^10.0.0", + "signal-exit": "^3.0.5" }, "devDependencies": { "@babel/cli": "^7.15.4", diff --git a/packages/gatsby-worker/src/__tests__/fixtures/test-child.ts b/packages/gatsby-worker/src/__tests__/fixtures/test-child.ts index 2e0bfd45fcaec..ef7c9d602f0b5 100644 --- a/packages/gatsby-worker/src/__tests__/fixtures/test-child.ts +++ b/packages/gatsby-worker/src/__tests__/fixtures/test-child.ts @@ -82,7 +82,12 @@ interface IPingMessage { type: `PING` } -export type MessagesFromChild = IPingMessage +interface ILotOfMessagesTestMessage { + type: `LOT_OF_MESSAGES_TEST` + payload: number +} + +export type MessagesFromChild = IPingMessage | ILotOfMessagesTestMessage interface IPongMessage { type: `PONG` @@ -97,6 +102,10 @@ let getWasPonged = function (): boolean { throw new Error(`gatsby-worker messenger not available`) } +let lotOfMessagesAndExit = function (count: number): void { + throw new Error(`gatsby-worker messenger not available`) +} + const messenger = getMessenger() if (messenger) { let wasPonged = false @@ -126,6 +135,13 @@ if (messenger) { getWasPonged = function getWasPonged(): boolean { return wasPonged } + + lotOfMessagesAndExit = function lotOfMessagesAndExit(count: number): boolean { + for (let i = 0; i < count; i++) { + messenger.sendMessage({ type: `LOT_OF_MESSAGES_TEST`, payload: i }) + } + process.exit(1) + } } -export { setupPingPongMessages, getWasPonged } +export { setupPingPongMessages, getWasPonged, lotOfMessagesAndExit } diff --git a/packages/gatsby-worker/src/__tests__/integration.ts b/packages/gatsby-worker/src/__tests__/integration.ts index 9b27eb91f1651..05ba182f8b432 100644 --- a/packages/gatsby-worker/src/__tests__/integration.ts +++ b/packages/gatsby-worker/src/__tests__/integration.ts @@ -36,8 +36,8 @@ describe(`gatsby-worker`, () => { fail(`worker pool not created`) } - const exposedMethodsSingle = Object.keys(workerPool.single) - const exposedMethodsAll = Object.keys(workerPool.all) + const exposedMethodsSingle = Object.keys(workerPool.single).sort() + const exposedMethodsAll = Object.keys(workerPool.all).sort() // we expect that `notAFunction` even tho is exported in child module is not exposed // as it's not a function expect(exposedMethodsSingle).toMatchInlineSnapshot(` @@ -46,6 +46,7 @@ describe(`gatsby-worker`, () => { "async100ms", "asyncThrow", "getWasPonged", + "lotOfMessagesAndExit", "neverEnding", "pid", "setupPingPongMessages", @@ -426,5 +427,27 @@ describe(`gatsby-worker`, () => { workerPool.sendMessage({ type: `PONG` }, 9001) }).toThrowError(`There is no worker with "9001" id.`) }) + + it(`messages are not lost if worker exits soon after sending a message`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + const COUNT = 10000 + + let counter = 0 + workerPool.onMessage(msg => { + if (msg.type === `LOT_OF_MESSAGES_TEST`) { + counter++ + } + }) + + try { + await workerPool.single.lotOfMessagesAndExit(COUNT) + } catch (e) { + console.log(e) + } + + expect(counter).toEqual(COUNT) + }) }) }) diff --git a/packages/gatsby-worker/src/child.ts b/packages/gatsby-worker/src/child.ts index 23def265ff1c6..67b2400e64eda 100644 --- a/packages/gatsby-worker/src/child.ts +++ b/packages/gatsby-worker/src/child.ts @@ -1,3 +1,5 @@ +import signalExit from "signal-exit" +import fs from "fs-extra" import { ParentMessageUnion, ChildMessageUnion, @@ -10,6 +12,7 @@ import { } from "./types" import { isPromise } from "./utils" +let counter = 0 export interface IGatsbyWorkerMessenger< MessagesFromParent = unknown, MessagesFromChild = MessagesFromParent @@ -30,12 +33,35 @@ let getMessenger = function < return undefined } -if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) { +if ( + process.send && + process.env.GATSBY_WORKER_MODULE_PATH && + process.env.GATSBY_WORKER_IN_FLIGHT_DUMP_LOCATION +) { + const workerInFlightsDumpLocation = + process.env.GATSBY_WORKER_IN_FLIGHT_DUMP_LOCATION isWorker = true const listeners: Array<(msg: any) => void> = [] - const ensuredSendToMain = process.send.bind(process) as ( - msg: ChildMessageUnion - ) => void + + const inFlightMessages = new Set() + signalExit(() => { + if (inFlightMessages.size > 0) { + // this need to be sync + fs.outputJsonSync( + workerInFlightsDumpLocation, + Array.from(inFlightMessages) + ) + } + }) + + function ensuredSendToMain(msg: ChildMessageUnion): void { + inFlightMessages.add(msg) + process.send!(msg, undefined, undefined, error => { + if (!error) { + inFlightMessages.delete(msg) + } + }) + } function onError(error: Error): void { if (error == null) { @@ -44,6 +70,7 @@ if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) { const msg: ChildMessageUnion = [ ERROR, + ++counter, error.constructor && error.constructor.name, error.message, error.stack, @@ -54,7 +81,7 @@ if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) { } function onResult(result: unknown): void { - const msg: ChildMessageUnion = [RESULT, result] + const msg: ChildMessageUnion = [RESULT, ++counter, result] ensuredSendToMain(msg) } @@ -69,7 +96,7 @@ if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) { listeners.push(listener) }, sendMessage(msg: MessagesFromChild): void { - const poolMsg: ChildMessageUnion = [CUSTOM_MESSAGE, msg] + const poolMsg: ChildMessageUnion = [CUSTOM_MESSAGE, ++counter, msg] ensuredSendToMain(poolMsg) }, messagingVersion: MESSAGING_VERSION, @@ -82,7 +109,7 @@ if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) { if (msg[0] === EXECUTE) { let result try { - result = child[msg[1]].call(child, ...msg[2]) + result = child[msg[2]].call(child, ...msg[3]) } catch (e) { onError(e) return @@ -97,14 +124,14 @@ if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) { process.off(`message`, messageHandler) } else if (msg[0] === CUSTOM_MESSAGE) { for (const listener of listeners) { - listener(msg[1]) + listener(msg[2]) } } } process.on(`message`, messageHandler) - ensuredSendToMain([WORKER_READY]) + ensuredSendToMain([WORKER_READY, ++counter]) } export { isWorker, getMessenger } diff --git a/packages/gatsby-worker/src/index.ts b/packages/gatsby-worker/src/index.ts index adde5a869a030..828939a3cce81 100644 --- a/packages/gatsby-worker/src/index.ts +++ b/packages/gatsby-worker/src/index.ts @@ -1,4 +1,7 @@ -import { fork, ChildProcess } from "child_process" +import { fork } from "child_process" +import fs from "fs-extra" +import os from "os" +import path from "path" import { TaskQueue } from "./task-queue" import { @@ -82,7 +85,9 @@ class TaskInfo { interface IWorkerInfo { workerId: number - worker: ChildProcess + send: (msg: ParentMessageUnion) => void + kill: (signal?: NodeJS.Signals | number) => boolean + lastMessage: number exitedPromise: Promise<{ code: number | null signal: NodeJS.Signals | null @@ -131,6 +136,7 @@ export class WorkerPool< private idleWorkers: Set> = new Set() private listeners: Array<(msg: MessagesFromChild, workerId: number) => void> = [] + private counter = 0 constructor(private workerPath: string, private options?: IWorkerOptions) { const single: Partial["single"]> = {} @@ -170,8 +176,14 @@ export class WorkerPool< } private startAll(): void { + this.counter = 0 + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), `gatsby-worker`)) const options = this.options for (let workerId = 1; workerId <= (options?.numWorkers ?? 1); workerId++) { + const workerInFlightsDumpLocation = path.join( + tmpDir, + `worker-${workerId}.json` + ) const worker = fork(childWrapperPath, { cwd: process.cwd(), env: { @@ -179,6 +191,7 @@ export class WorkerPool< ...(options?.env ?? {}), GATSBY_WORKER_ID: workerId.toString(), GATSBY_WORKER_MODULE_PATH: this.workerPath, + GATSBY_WORKER_IN_FLIGHT_DUMP_LOCATION: workerInFlightsDumpLocation, }, // Suppress --debug / --inspect flags while preserving others (like --harmony). execArgv: process.execArgv.filter(v => !/^--(debug|inspect)/.test(v)), @@ -186,28 +199,66 @@ export class WorkerPool< }) let workerReadyResolve: () => void + let workerExitResolve: (arg: { + code: number | null + signal: NodeJS.Signals | null + }) => void + const workerInfo: IWorkerInfo = { workerId, - worker, + send: (msg: ParentMessageUnion): void => { + if (!worker.connected) { + return + } + + worker.send(msg, undefined, undefined, error => { + if (error && worker.connected) { + throw error + } + }) + }, + kill: worker.kill.bind(worker), ready: new Promise(resolve => { workerReadyResolve = resolve }), + lastMessage: 0, exitedPromise: new Promise(resolve => { - worker.on(`exit`, (code, signal) => { - if (workerInfo.currentTask) { - // worker exited without finishing a task - workerInfo.currentTask.reject( - new Error(`Worker exited before finishing task`) - ) - } - // remove worker from list of workers - this.workers.splice(this.workers.indexOf(workerInfo), 1) - resolve({ code, signal }) - }) + workerExitResolve = resolve }), } - worker.on(`message`, (msg: ChildMessageUnion) => { + const workerProcessMessageHandler = (msg: ChildMessageUnion): void => { + if (!Array.isArray(msg)) { + // all gatsby-worker messages should be an array + // if it's not an array we skip it + return + } else if (msg[1] <= workerInfo.lastMessage) { + // this message was already handled, so skipping it + // this is specifically for special casing worker exits + // where we serialize "in-flight" IPC messages to fs + // and "replay" them here to ensure no messages are lost + // Trickiness is that while we write out in flight IPC messages + // to fs, those messages might actually still go through as regular + // ipc messages so we have to ensure we don't handle same message twice + return + } else if (msg[1] !== workerInfo.lastMessage + 1) { + // TODO: figure out IPC message order guarantees (or lack of them) - for now + // condition above relies on IPC messages being received in same order + // as they were sent via `process.send` in child process + // generally we expect messages we receive to be next one (lastMessage + 1) + // IF order is not guaranteed, then different strategy for de-duping messages + // is needed. + throw new Error( + `[gatsby-worker] Out of order message. Expected ${ + workerInfo.lastMessage + 1 + }, got ${msg[1]}.\n\nFull message:\n${JSON.stringify( + msg, + null, + 2 + )}.` + ) + } + workerInfo.lastMessage = msg[1] if (msg[0] === RESULT) { if (!workerInfo.currentTask) { throw new Error( @@ -217,7 +268,7 @@ export class WorkerPool< const task = workerInfo.currentTask workerInfo.currentTask = undefined this.checkForWork(workerInfo) - task.resolve(msg[1]) + task.resolve(msg[2]) } else if (msg[0] === ERROR) { if (!workerInfo.currentTask) { throw new Error( @@ -225,18 +276,18 @@ export class WorkerPool< ) } - let error = msg[4] + let error = msg[5] if (error !== null && typeof error === `object`) { const extra = error - const NativeCtor = global[msg[1]] + const NativeCtor = global[msg[2]] const Ctor = typeof NativeCtor === `function` ? NativeCtor : Error - error = new Ctor(msg[2]) + error = new Ctor(msg[3]) // @ts-ignore type doesn't exist on Error, but that's what jest-worker does for errors :shrug: - error.type = msg[1] - error.stack = msg[3] + error.type = msg[2] + error.stack = msg[4] for (const key in extra) { if (Object.prototype.hasOwnProperty.call(extra, key)) { @@ -251,11 +302,39 @@ export class WorkerPool< task.reject(error) } else if (msg[0] === CUSTOM_MESSAGE) { for (const listener of this.listeners) { - listener(msg[1] as MessagesFromChild, workerId) + listener(msg[2] as MessagesFromChild, workerId) } } else if (msg[0] === WORKER_READY) { workerReadyResolve() } + } + + worker.on(`message`, workerProcessMessageHandler) + worker.on(`exit`, async (code, signal) => { + if (await fs.pathExists(workerInFlightsDumpLocation)) { + const pendingMessages = await fs.readJSON(workerInFlightsDumpLocation) + if (Array.isArray(pendingMessages)) { + for (const msg of pendingMessages) { + workerProcessMessageHandler(msg) + } + } + try { + await fs.remove(workerInFlightsDumpLocation) + } catch { + // this is just cleanup, failing to delete this file + // won't cause + } + } + + if (workerInfo.currentTask) { + // worker exited without finishing a task + workerInfo.currentTask.reject( + new Error(`Worker exited before finishing task`) + ) + } + // remove worker from list of workers + this.workers.splice(this.workers.indexOf(workerInfo), 1) + workerExitResolve({ code, signal }) }) this.workers.push(workerInfo) @@ -270,13 +349,13 @@ export class WorkerPool< end(): Array> { const results = this.workers.map(async workerInfo => { // tell worker to end gracefully - const endMessage: ParentMessageUnion = [END] + const endMessage: ParentMessageUnion = [END, ++this.counter] - workerInfo.worker.send(endMessage) + workerInfo.send(endMessage) // force exit if worker doesn't exit gracefully quickly const forceExitTimeout = setTimeout(() => { - workerInfo.worker.kill(`SIGKILL`) + workerInfo.kill(`SIGKILL`) }, 1000) const exitResult = await workerInfo.exitedPromise @@ -342,10 +421,11 @@ export class WorkerPool< const msg: ParentMessageUnion = [ EXECUTE, + ++this.counter, taskInfo.functionName, taskInfo.args, ] - workerInfo.worker.send(msg) + workerInfo.send(msg) } private scheduleWork( @@ -402,8 +482,8 @@ export class WorkerPool< throw new Error(`There is no worker with "${workerId}" id.`) } - const poolMsg = [CUSTOM_MESSAGE, msg] - worker.worker.send(poolMsg) + const poolMsg: ParentMessageUnion = [CUSTOM_MESSAGE, ++this.counter, msg] + worker.send(poolMsg) } } diff --git a/packages/gatsby-worker/src/types.ts b/packages/gatsby-worker/src/types.ts index 59dc47fa4e4c2..a92fad26c7d94 100644 --- a/packages/gatsby-worker/src/types.ts +++ b/packages/gatsby-worker/src/types.ts @@ -5,14 +5,16 @@ export const END = 0b00 export const CUSTOM_MESSAGE = 0b100 export const WORKER_READY = 0b1000 -type CustomMessage = [typeof CUSTOM_MESSAGE, unknown] +type Counter = number + +type CustomMessage = [typeof CUSTOM_MESSAGE, Counter, unknown] type FunctionName = string | number | symbol type FunctionArgs = Array -type ExecuteMessage = [typeof EXECUTE, FunctionName, FunctionArgs] -type EndMessage = [typeof END] -type WorkerReadyMessage = [typeof WORKER_READY] +type ExecuteMessage = [typeof EXECUTE, Counter, FunctionName, FunctionArgs] +type EndMessage = [typeof END, Counter] +type WorkerReadyMessage = [typeof WORKER_READY, Counter] export type ParentMessageUnion = ExecuteMessage | EndMessage | CustomMessage @@ -22,6 +24,7 @@ type ErrorStack = string type TaskError = [ typeof ERROR, + Counter, ErrorType, ErrorMessage, ErrorStack | undefined, @@ -30,7 +33,7 @@ type TaskError = [ type ResultType = unknown -type TaskResult = [typeof RESULT, ResultType] +type TaskResult = [typeof RESULT, Counter, ResultType] export type ChildMessageUnion = | TaskError diff --git a/packages/gatsby/src/utils/jobs/types.ts b/packages/gatsby/src/utils/jobs/types.ts index 6f3436ffe8b3f..f9797d3abc60d 100644 --- a/packages/gatsby/src/utils/jobs/types.ts +++ b/packages/gatsby/src/utils/jobs/types.ts @@ -77,6 +77,11 @@ export class WorkerError extends Error { this.name = `WorkerError` - Error.captureStackTrace(this, WorkerError) + if (typeof error === `string`) { + Error.captureStackTrace(this, WorkerError) + } else { + // inherit stack from original error so actual stack trace persist + this.stack = error.stack + } } } diff --git a/packages/gatsby/src/utils/jobs/worker-messaging.ts b/packages/gatsby/src/utils/jobs/worker-messaging.ts index 211ac432c7ebd..5c153ede3897e 100644 --- a/packages/gatsby/src/utils/jobs/worker-messaging.ts +++ b/packages/gatsby/src/utils/jobs/worker-messaging.ts @@ -44,6 +44,7 @@ export function initJobsMessagingInMainProcess( payload: { id: msg.payload.id, error: error.message, + stack: error.stack, }, }, workerId @@ -88,7 +89,7 @@ export function initJobsMessagingInWorker(): void { deferredPromise.resolve(result) deferredWorkerPromises.delete(id) } else if (msg.type === MESSAGE_TYPES.JOB_FAILED) { - const { id, error } = msg.payload + const { id, error, stack } = msg.payload const deferredPromise = deferredWorkerPromises.get(id) if (!deferredPromise) { @@ -97,7 +98,11 @@ export function initJobsMessagingInWorker(): void { ) } - deferredPromise.reject(new WorkerError(error)) + const errorObject = new WorkerError(error) + if (stack) { + errorObject.stack = stack + } + deferredPromise.reject(errorObject) deferredWorkerPromises.delete(id) } }) diff --git a/packages/gatsby/src/utils/worker/__tests__/jobs.ts b/packages/gatsby/src/utils/worker/__tests__/jobs.ts index 5fb4143c647a2..03aac60792266 100644 --- a/packages/gatsby/src/utils/worker/__tests__/jobs.ts +++ b/packages/gatsby/src/utils/worker/__tests__/jobs.ts @@ -243,6 +243,7 @@ describe(`worker (jobs)`, () => { payload: { id: expect.any(String), error: expect.any(String), + stack: expect.any(String), }, }), expect.toBeOneOf([1, 2, 3]) diff --git a/packages/gatsby/src/utils/worker/messaging.ts b/packages/gatsby/src/utils/worker/messaging.ts index 8386e892c9bf7..4c29a207f6156 100644 --- a/packages/gatsby/src/utils/worker/messaging.ts +++ b/packages/gatsby/src/utils/worker/messaging.ts @@ -3,10 +3,20 @@ import { ReporterMessagesFromChild } from "gatsby-cli/lib/reporter/types" import { IJobCreatedMessage, IJobCompletedMessage, - IJobFailed, + MESSAGE_TYPES, + InternalJob, } from "../jobs/types" -export type MessagesFromParent = IJobCompletedMessage | IJobFailed +interface IJobFailedSerialized { + type: MESSAGE_TYPES.JOB_FAILED + payload: { + id: InternalJob["id"] + error: string + stack?: string + } +} + +export type MessagesFromParent = IJobCompletedMessage | IJobFailedSerialized export type MessagesFromChild = IJobCreatedMessage | ReporterMessagesFromChild export type GatsbyWorkerMessenger = IGatsbyWorkerMessenger<