Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/gatsby-worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
},
"dependencies": {
"@babel/core": "^7.15.5",
"@babel/runtime": "^7.15.4"
"@babel/runtime": "^7.15.4",
"fs-extra": "^10.0.0",
"signal-exit": "^3.0.5"
},
"devDependencies": {
"@babel/cli": "^7.15.4",
Expand Down
20 changes: 18 additions & 2 deletions packages/gatsby-worker/src/__tests__/fixtures/test-child.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ interface IPingMessage {
type: `PING`
}

export type MessagesFromChild = IPingMessage
interface ILotOfMessagesTestMessage {
type: `LOT_OF_MESSAGES_TEST`
payload: number
}

export type MessagesFromChild = IPingMessage | ILotOfMessagesTestMessage

interface IPongMessage {
type: `PONG`
Expand All @@ -97,6 +102,10 @@ let getWasPonged = function (): boolean {
throw new Error(`gatsby-worker messenger not available`)
}

let lotOfMessagesAndExit = function (count: number): void {
throw new Error(`gatsby-worker messenger not available`)
}

const messenger = getMessenger<MessagesFromParent, MessagesFromChild>()
if (messenger) {
let wasPonged = false
Expand Down Expand Up @@ -126,6 +135,13 @@ if (messenger) {
getWasPonged = function getWasPonged(): boolean {
return wasPonged
}

lotOfMessagesAndExit = function lotOfMessagesAndExit(count: number): boolean {
for (let i = 0; i < count; i++) {
messenger.sendMessage({ type: `LOT_OF_MESSAGES_TEST`, payload: i })
}
process.exit(1)
}
}

export { setupPingPongMessages, getWasPonged }
export { setupPingPongMessages, getWasPonged, lotOfMessagesAndExit }
27 changes: 25 additions & 2 deletions packages/gatsby-worker/src/__tests__/integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ describe(`gatsby-worker`, () => {
fail(`worker pool not created`)
}

const exposedMethodsSingle = Object.keys(workerPool.single)
const exposedMethodsAll = Object.keys(workerPool.all)
const exposedMethodsSingle = Object.keys(workerPool.single).sort()
const exposedMethodsAll = Object.keys(workerPool.all).sort()
// we expect that `notAFunction` even tho is exported in child module is not exposed
// as it's not a function
expect(exposedMethodsSingle).toMatchInlineSnapshot(`
Expand All @@ -46,6 +46,7 @@ describe(`gatsby-worker`, () => {
"async100ms",
"asyncThrow",
"getWasPonged",
"lotOfMessagesAndExit",
"neverEnding",
"pid",
"setupPingPongMessages",
Expand Down Expand Up @@ -426,5 +427,27 @@ describe(`gatsby-worker`, () => {
workerPool.sendMessage({ type: `PONG` }, 9001)
}).toThrowError(`There is no worker with "9001" id.`)
})

it(`messages are not lost if worker exits soon after sending a message`, async () => {
if (!workerPool) {
fail(`worker pool not created`)
}
const COUNT = 10000

let counter = 0
workerPool.onMessage(msg => {
if (msg.type === `LOT_OF_MESSAGES_TEST`) {
counter++
}
})

try {
await workerPool.single.lotOfMessagesAndExit(COUNT)
} catch (e) {
console.log(e)
}

expect(counter).toEqual(COUNT)
})
})
})
45 changes: 36 additions & 9 deletions packages/gatsby-worker/src/child.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import signalExit from "signal-exit"
import fs from "fs-extra"
import {
ParentMessageUnion,
ChildMessageUnion,
Expand All @@ -10,6 +12,7 @@ import {
} from "./types"
import { isPromise } from "./utils"

let counter = 0
export interface IGatsbyWorkerMessenger<
MessagesFromParent = unknown,
MessagesFromChild = MessagesFromParent
Expand All @@ -30,12 +33,35 @@ let getMessenger = function <
return undefined
}

if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) {
if (
process.send &&
process.env.GATSBY_WORKER_MODULE_PATH &&
process.env.GATSBY_WORKER_IN_FLIGHT_DUMP_LOCATION
) {
const workerInFlightsDumpLocation =
process.env.GATSBY_WORKER_IN_FLIGHT_DUMP_LOCATION
isWorker = true
const listeners: Array<(msg: any) => void> = []
const ensuredSendToMain = process.send.bind(process) as (
msg: ChildMessageUnion
) => void

const inFlightMessages = new Set<ChildMessageUnion>()
signalExit(() => {
if (inFlightMessages.size > 0) {
// this need to be sync
fs.outputJsonSync(
workerInFlightsDumpLocation,
Array.from(inFlightMessages)
)
}
})

function ensuredSendToMain(msg: ChildMessageUnion): void {
inFlightMessages.add(msg)
process.send!(msg, undefined, undefined, error => {
if (!error) {
inFlightMessages.delete(msg)
}
})
}

function onError(error: Error): void {
if (error == null) {
Expand All @@ -44,6 +70,7 @@ if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) {

const msg: ChildMessageUnion = [
ERROR,
++counter,
error.constructor && error.constructor.name,
error.message,
error.stack,
Expand All @@ -54,7 +81,7 @@ if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) {
}

function onResult(result: unknown): void {
const msg: ChildMessageUnion = [RESULT, result]
const msg: ChildMessageUnion = [RESULT, ++counter, result]
ensuredSendToMain(msg)
}

Expand All @@ -69,7 +96,7 @@ if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) {
listeners.push(listener)
},
sendMessage(msg: MessagesFromChild): void {
const poolMsg: ChildMessageUnion = [CUSTOM_MESSAGE, msg]
const poolMsg: ChildMessageUnion = [CUSTOM_MESSAGE, ++counter, msg]
ensuredSendToMain(poolMsg)
},
messagingVersion: MESSAGING_VERSION,
Expand All @@ -82,7 +109,7 @@ if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) {
if (msg[0] === EXECUTE) {
let result
try {
result = child[msg[1]].call(child, ...msg[2])
result = child[msg[2]].call(child, ...msg[3])
} catch (e) {
onError(e)
return
Expand All @@ -97,14 +124,14 @@ if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) {
process.off(`message`, messageHandler)
} else if (msg[0] === CUSTOM_MESSAGE) {
for (const listener of listeners) {
listener(msg[1])
listener(msg[2])
}
}
}

process.on(`message`, messageHandler)

ensuredSendToMain([WORKER_READY])
ensuredSendToMain([WORKER_READY, ++counter])
}

export { isWorker, getMessenger }
Loading