From 0e86fb723314d552b09f9a9347984c0aea6ec04b Mon Sep 17 00:00:00 2001 From: Vladimir Razuvaev Date: Thu, 6 May 2021 17:46:31 +0700 Subject: [PATCH] perf(gatsby): use fastq instead of better-queue + refactor (#31269) * perf(gatsby): refactor query queueing and use fastq instead of better-queue * restore graphql-runner * remove "better-queue" dependency --- packages/gatsby/package.json | 1 - .../__tests__/better-queue-custom-store.ts | 195 ----------------- .../src/query/better-queue-custom-store.ts | 202 ------------------ packages/gatsby/src/query/index.js | 166 ++++++++++---- packages/gatsby/src/query/queue.ts | 169 --------------- 5 files changed, 125 insertions(+), 608 deletions(-) delete mode 100644 packages/gatsby/src/query/__tests__/better-queue-custom-store.ts delete mode 100644 packages/gatsby/src/query/better-queue-custom-store.ts delete mode 100644 packages/gatsby/src/query/queue.ts diff --git a/packages/gatsby/package.json b/packages/gatsby/package.json index 5e46d79a64e62..cf9e7c9ec9d8a 100644 --- a/packages/gatsby/package.json +++ b/packages/gatsby/package.json @@ -37,7 +37,6 @@ "babel-plugin-remove-graphql-queries": "^3.5.0-next.0", "babel-preset-gatsby": "^1.5.0-next.0", "better-opn": "^2.0.0", - "better-queue": "^3.8.10", "bluebird": "^3.7.2", "body-parser": "^1.19.0", "browserslist": "^4.12.2", diff --git a/packages/gatsby/src/query/__tests__/better-queue-custom-store.ts b/packages/gatsby/src/query/__tests__/better-queue-custom-store.ts deleted file mode 100644 index 7b3171001ed33..0000000000000 --- a/packages/gatsby/src/query/__tests__/better-queue-custom-store.ts +++ /dev/null @@ -1,195 +0,0 @@ -import { memoryStoreWithPriorityBuckets } from "../better-queue-custom-store" -import pify from "pify" - -// those are tests copied from https://github.com/diamondio/better-queue-store-test/blob/master/tester.js -// and converted from mocha to jest + used pify to make it nicer to read than callback chain -describe(`Custom better-queue memory store`, () => { - let store - - const functions = [ - `connect`, - `getTask`, - `putTask`, - `deleteTask`, - `takeFirstN`, - `takeLastN`, - `getLock`, - `getRunningTasks`, - `releaseLock`, - ] - beforeEach(() => { - store = memoryStoreWithPriorityBuckets() - functions.forEach(fnName => { - if (store[fnName]) { - store[fnName] = pify(store[fnName]) - } - }) - }) - - it(`all required functions exist`, () => { - functions.forEach(fnName => { - expect(typeof store[fnName]).toBe(`function`) - }) - }) - - it(`connect starts empty`, async () => { - const len = await store.connect() - expect(len).toBe(0) - }) - - it(`put and get`, async () => { - await store.connect() - await store.putTask(`test`, { value: `secret` }, 1) - - const task = await store.getTask(`test`) - expect(task.value).toBe(`secret`) - }) - - it(`put 3, take last 2, take last 2`, async () => { - await store.connect() - await store.putTask(`task1`, { value: `secret 1` }, 1) - await store.putTask(`task2`, { value: `secret 2` }, 1) - await store.putTask(`task3`, { value: `secret 3` }, 1) - - let lockId: string = await store.takeLastN(2) - let tasks: any = await store.getLock(lockId) - - // should get the third task - expect(tasks.task3.value).toBe(`secret 3`) - // should get the second task - expect(tasks.task2.value).toBe(`secret 2`) - // should not get the first task - expect(tasks.task1).toBeUndefined() - - lockId = await store.takeLastN(2) - tasks = await store.getLock(lockId) - - // should not get the third task - expect(tasks.task3).toBeUndefined() - // should not get the second task - expect(tasks.task2).toBeUndefined() - // should get the first task - expect(tasks.task1.value).toBe(`secret 1`) - }) - - it(`put 3, take first 2, take first 2`, async () => { - await store.connect() - await store.putTask(`task1`, { value: `secret 1` }, 1) - await store.putTask(`task2`, { value: `secret 2` }, 1) - await store.putTask(`task3`, { value: `secret 3` }, 1) - - let lockId = await store.takeFirstN(2) - let tasks = await store.getLock(lockId) - - // should get the first task - expect(tasks.task1.value).toBe(`secret 1`) - // should get the second task - expect(tasks.task2.value).toBe(`secret 2`) - // should not get the third task - expect(tasks.task3).toBeUndefined() - - lockId = await store.takeFirstN(2) - tasks = await store.getLock(lockId) - - // should not get the first task - expect(tasks.task1).toBeUndefined() - // should not get the second task - expect(tasks.task2).toBeUndefined() - // should get the third task - expect(tasks.task3.value).toBe(`secret 3`) - }) - - it(`get and release workers`, async () => { - await store.connect() - await store.putTask(`task1`, { value: `secret 1` }, 1) - await store.putTask(`task2`, { value: `secret 2` }, 1) - await store.putTask(`task3`, { value: `secret 3` }, 1) - - const lock1: string = await store.takeFirstN(1) - const lock2: string = await store.takeLastN(1) - - let workers = await store.getRunningTasks() - - // should have first lock - expect(workers[lock1]).toBeDefined() - // should have second lock - expect(workers[lock2]).toBeDefined() - // should have two workers - expect(Object.keys(workers).length).toBe(2) - // should have task1 - expect(workers[lock1].task1.value).toBe(`secret 1`) - // should have task3 - expect(workers[lock2].task3.value).toBe(`secret 3`) - - await store.releaseLock(lock1) - await store.releaseLock(lock2) - - workers = await store.getRunningTasks() - - // should not have lock 1 - expect(workers[lock1]).toBeUndefined() - // should not have lock 2 - expect(workers[lock2]).toBeUndefined() - // should have no workers - expect(Object.keys(workers).length).toBe(0) - }) - - it(`put 4, delete 1, take first 2`, async () => { - await store.connect() - await store.putTask(`task1`, { value: `secret 1` }, 1) - await store.putTask(`task2`, { value: `secret 2` }, 1) - await store.putTask(`task3`, { value: `secret 3` }, 1) - await store.putTask(`task4`, { value: `secret 4` }, 1) - - // Remove the second - await store.deleteTask(`task2`) - - // take 2 - const lockId: string = await store.takeFirstN(2) - const tasks = await store.getLock(lockId) - - // should get the first task - expect(tasks.task1.value).toBe(`secret 1`) - // should not get the second task - expect(tasks.task2).toBeUndefined() - // should get the third task - expect(tasks.task3.value).toBe(`secret 3`) - // should not get the fourth task - expect(tasks.task4).toBeUndefined() - }) - - // extra tests to cover priority - it(`handles priority`, async () => { - await store.connect() - await store.putTask(`task1`, { value: `secret 1` }, 1) - await store.putTask(`task2`, { value: `secret 2` }, 3) - await store.putTask(`task3`, { value: `secret 3` }, 4) - await store.putTask(`task4`, { value: `secret 4` }, 2) - - // take first 2 - let lockId: string = await store.takeFirstN(2) - let tasks = await store.getLock(lockId) - - // should get the third task - expect(tasks.task3.value).toBe(`secret 3`) - // should get the second task - expect(tasks.task2.value).toBe(`secret 2`) - // should not get first task - expect(tasks.task1).toBeUndefined() - // should not get the fourth task - expect(tasks.task4).toBeUndefined() - - // take last 1 - lockId = await store.takeLastN(1) - tasks = await store.getLock(lockId) - - // should get the first task - expect(tasks.task1.value).toBe(`secret 1`) - // should not get second task - expect(tasks.task2).toBeUndefined() - // should not get third task - expect(tasks.task3).toBeUndefined() - // should not get the fourth task - expect(tasks.task4).toBeUndefined() - }) -}) diff --git a/packages/gatsby/src/query/better-queue-custom-store.ts b/packages/gatsby/src/query/better-queue-custom-store.ts deleted file mode 100644 index 4b90626f65061..0000000000000 --- a/packages/gatsby/src/query/better-queue-custom-store.ts +++ /dev/null @@ -1,202 +0,0 @@ -import { Store } from "better-queue" - -export function memoryStoreWithPriorityBuckets(): Store { - type RunningTasks = Record - let uuid = 0 - - /** - * Task ids grouped by priority - */ - const queueMap = new Map>() - - /** - * Task id to task lookup - */ - const tasks = new Map() - - /** - * Task id to priority lookup - */ - const taskIdToPriority = new Map() - - /** - * Lock to running tasks object - */ - const running: Record = {} - - let priorityKeys: Array = [] - const updatePriorityKeys = (): void => { - priorityKeys = Array.from(queueMap.keys()).sort((a, b) => b - a) - } - - const addTaskWithPriority = (taskId: string, priority: number): boolean => { - let needToUpdatePriorityKeys = false - let priorityTasks = queueMap.get(priority) - if (!priorityTasks) { - priorityTasks = [] - queueMap.set(priority, priorityTasks) - needToUpdatePriorityKeys = true - } - - taskIdToPriority.set(taskId, priority) - priorityTasks.push(taskId) - return needToUpdatePriorityKeys - } - - return { - connect: function (cb): void { - cb(null, tasks.size) - }, - getTask: function (taskId, cb): void { - // @ts-ignore - cb(null, tasks.get(taskId)) - }, - deleteTask: function (taskId, cb): void { - if (tasks.get(taskId)) { - tasks.delete(taskId) - const priority = taskIdToPriority.get(taskId) - if (priority) { - const priorityTasks = queueMap.get(priority) ?? [] - priorityTasks.splice(priorityTasks.indexOf(taskId), 1) - taskIdToPriority.delete(taskId) - } - } - cb() - }, - putTask: function (taskId, task, priority = 0, cb): void { - const oldTask = tasks.get(taskId) - tasks.set(taskId, task) - let needToUpdatePriorityKeys = false - if (oldTask) { - const oldPriority = taskIdToPriority.get(taskId) - - if (oldPriority && oldPriority !== priority) { - const oldPriorityTasks = queueMap.get(oldPriority) ?? [] - oldPriorityTasks.splice(oldPriorityTasks.indexOf(taskId), 1) - - if ( - addTaskWithPriority(taskId, priority) // || - // oldPriorityTasks.length === 0 - ) { - needToUpdatePriorityKeys = true - } - } - } else { - needToUpdatePriorityKeys = addTaskWithPriority(taskId, priority) - } - - if (needToUpdatePriorityKeys) { - updatePriorityKeys() - } - cb(null) - }, - takeFirstN: function (n, cb): void { - const lockId = String(uuid++) - let remainingTasks = n - let needToUpdatePriorityKeys = false - let haveSomeTasks = false - const tasksToRun: RunningTasks = {} - - for (const priority of priorityKeys) { - const tasksWithSamePriority = queueMap.get(priority) - const grabbedTaskIds = - tasksWithSamePriority?.splice(0, remainingTasks) ?? [] - grabbedTaskIds.forEach(taskId => { - // add task to task that will run - // and remove it from waiting list - const task = tasks.get(taskId) - if (task) { - tasksToRun[taskId] = task - tasks.delete(taskId) - taskIdToPriority.delete(taskId) - haveSomeTasks = true - } - }) - - remainingTasks -= grabbedTaskIds.length - if (tasksWithSamePriority?.length === 0) { - queueMap.delete(priority) - needToUpdatePriorityKeys = true - } - if (remainingTasks <= 0) { - break - } - } - - if (needToUpdatePriorityKeys) { - updatePriorityKeys() - } - - if (haveSomeTasks) { - running[lockId] = tasksToRun - } - - cb(null, lockId) - }, - takeLastN: function (n, cb): void { - // This is not really used by Gatsby, but will be implemented for - // completion in easiest possible way (so not very performant). - // Mostly done so generic test suite used by other stores passes. - // This is mostly C&P from takeFirstN, with array reversal and different - // splice args - const lockId = String(uuid++) - let remainingTasks = n - let needToUpdatePriorityKeys = false - let haveSomeTasks = false - const tasksToRun = {} - - for (const priority of priorityKeys.reverse()) { - const tasksWithSamePriority = queueMap.get(priority) ?? [] - const deleteCount = Math.min( - remainingTasks, - tasksWithSamePriority.length - ) - const grabbedTaskIds = tasksWithSamePriority.splice( - tasksWithSamePriority.length - deleteCount, - deleteCount - ) - grabbedTaskIds.forEach(taskId => { - // add task to task that will run - // and remove it from waiting list - tasksToRun[taskId] = tasks.get(taskId) - tasks.delete(taskId) - taskIdToPriority.delete(taskId) - haveSomeTasks = true - }) - - remainingTasks -= grabbedTaskIds.length - if (tasksWithSamePriority.length === 0) { - queueMap.delete(priority) - needToUpdatePriorityKeys = true - } - if (remainingTasks <= 0) { - break - } - } - - if (needToUpdatePriorityKeys) { - updatePriorityKeys() - } - - if (haveSomeTasks) { - running[lockId] = tasksToRun - } - - cb(null, lockId) - }, - // @ts-ignore - // getRunningTasks is an extension to the interface, and is only used in the tests - getRunningTasks: function ( - cb: (err?: any, value?: Record) => void - ): void { - cb(null, running) - }, - getLock: function (lockId, cb): void { - cb(null, running[lockId]) - }, - releaseLock: function (lockId, cb): void { - delete running[lockId] - cb(null) - }, - } -} diff --git a/packages/gatsby/src/query/index.js b/packages/gatsby/src/query/index.js index 37f02cbd64968..ba66598bfef2b 100644 --- a/packages/gatsby/src/query/index.js +++ b/packages/gatsby/src/query/index.js @@ -1,7 +1,19 @@ const _ = require(`lodash`) +const fastq = require(`fastq`) const { store } = require(`../redux`) const { hasFlag, FLAG_ERROR_EXTRACTION } = require(`../redux/reducers/queries`) -const queryQueue = require(`./queue`) +const { queryRunner } = require(`./query-runner`) +const { websocketManager } = require(`../utils/websocket-manager`) +const { GraphQLRunner } = require(`./graphql-runner`) + +if (process.env.GATSBY_EXPERIMENTAL_QUERY_CONCURRENCY) { + console.info( + `GATSBY_EXPERIMENTAL_QUERY_CONCURRENCY: Running with concurrency set to \`${process.env.GATSBY_EXPERIMENTAL_QUERY_CONCURRENCY}\`` + ) +} + +const concurrency = + Number(process.env.GATSBY_EXPERIMENTAL_QUERY_CONCURRENCY) || 4 /** * Calculates the set of dirty query IDs (page.paths, or staticQuery.id's). @@ -9,7 +21,7 @@ const queryQueue = require(`./queue`) * Dirty state is tracked in `queries` reducer, here we simply filter * them from all tracked queries. */ -const calcDirtyQueryIds = state => { +function calcDirtyQueryIds(state) { const { trackedQueries, trackedComponents, deletedQueries } = state.queries const queriesWithBabelErrors = new Set() @@ -36,7 +48,7 @@ const calcDirtyQueryIds = state => { /** * groups queryIds by whether they are static or page queries. */ -const groupQueryIds = queryIds => { +function groupQueryIds(queryIds) { const grouped = _.groupBy(queryIds, p => p.slice(0, 4) === `sq--` ? `static` : `page` ) @@ -46,18 +58,83 @@ const groupQueryIds = queryIds => { } } -const processQueries = async ( - queryJobs, - { activity, graphqlRunner, graphqlTracing } -) => { - const queue = queryQueue.createAppropriateQueue(graphqlRunner, { - graphqlTracing, +function createQueue({ + createJobFn, + state, + activity, + graphqlRunner, + graphqlTracing, +}) { + if (!graphqlRunner) { + graphqlRunner = new GraphQLRunner(store, { graphqlTracing }) + } + state = state || store.getState() + + function worker(queryId, cb) { + const job = createJobFn(state, queryId) + if (!job) { + cb(null, undefined) + return + } + queryRunner(graphqlRunner, job, activity?.span) + .then(result => { + if (activity.tick) { + activity.tick() + } + cb(null, { job, result }) + }) + .catch(error => { + cb(error) + }) + } + // Note: fastq.promise version is much slower + return fastq(worker, concurrency) +} + +async function processQueries({ + queryIds, + createJobFn, + onQueryDone, + state, + activity, + graphqlRunner, + graphqlTracing, +}) { + return new Promise((resolve, reject) => { + const fastQueue = createQueue({ + createJobFn, + state, + activity, + graphqlRunner, + graphqlTracing, + }) + + queryIds.forEach(queryId => { + fastQueue.push(queryId, (err, res) => { + if (err) { + fastQueue.kill() + reject(err) + return + } + if (res && onQueryDone) { + onQueryDone(res) + } + }) + }) + + if (!fastQueue.idle()) { + fastQueue.drain = () => resolve() + } else { + resolve() + } }) - return queryQueue.processBatch(queue, queryJobs, activity) } -const createStaticQueryJob = (state, queryId) => { +function createStaticQueryJob(state, queryId) { const component = state.staticQueryComponents.get(queryId) + if (!component) { + return undefined + } const { hash, id, query, componentPath } = component return { id: queryId, @@ -68,48 +145,55 @@ const createStaticQueryJob = (state, queryId) => { } } -const processStaticQueries = async ( - queryIds, - { state, activity, graphqlRunner, graphqlTracing } -) => { - state = state || store.getState() - await processQueries( - queryIds.map(id => createStaticQueryJob(state, id)), - { - activity, - graphqlRunner, - graphqlTracing, - } - ) +function onDevelopStaticQueryDone({ job, result }) { + websocketManager.emitStaticQueryData({ + result, + id: job.hash, + }) } -const processPageQueries = async ( +async function processStaticQueries( queryIds, { state, activity, graphqlRunner, graphqlTracing } -) => { - state = state || store.getState() - // Make sure we filter out pages that don't exist. An example is - // /dev-404-page/, whose SitePage node is created via - // `internal-data-bridge`, but the actual page object is only - // created during `gatsby develop`. - - const jobs = [] - queryIds.forEach(id => { - const page = state.pages.get(id) - if (page) { - const job = createPageQueryJob(state, page) - jobs.push(job) - } +) { + return processQueries({ + queryIds, + createJobFn: createStaticQueryJob, + onQueryDone: + process.env.NODE_ENV === `production` + ? undefined + : onDevelopStaticQueryDone, + state, + activity, + graphqlRunner, + graphqlTracing, }) +} - await processQueries(jobs, { +async function processPageQueries( + queryIds, + { state, activity, graphqlRunner, graphqlTracing } +) { + return processQueries({ + queryIds, + createJobFn: createPageQueryJob, + state, activity, graphqlRunner, graphqlTracing, }) } -const createPageQueryJob = (state, page) => { +function createPageQueryJob(state, queryId) { + const page = state.pages.get(queryId) + + // Make sure we filter out pages that don't exist. An example is + // /dev-404-page/, whose SitePage node is created via + // `internal-data-bridge`, but the actual page object is only + // created during `gatsby develop`. + if (!page) { + return undefined + } const component = state.components.get(page.componentPath) const { path, componentPath, context } = page const { query } = component diff --git a/packages/gatsby/src/query/queue.ts b/packages/gatsby/src/query/queue.ts deleted file mode 100644 index 68152bae7602d..0000000000000 --- a/packages/gatsby/src/query/queue.ts +++ /dev/null @@ -1,169 +0,0 @@ -import Queue from "better-queue" -import { store } from "../redux" -import { memoryStoreWithPriorityBuckets } from "../query/better-queue-custom-store" -import { queryRunner } from "../query/query-runner" -import { websocketManager } from "../utils/websocket-manager" -import { GraphQLRunner, IGraphQLRunnerOptions } from "./graphql-runner" -import BetterQueue from "better-queue" -import { ProgressActivityTracker } from "../.." - -export type Task = any -type TaskResult = any - -if (process.env.GATSBY_EXPERIMENTAL_QUERY_CONCURRENCY) { - console.info( - `GATSBY_EXPERIMENTAL_QUERY_CONCURRENCY: Running with concurrency set to \`${process.env.GATSBY_EXPERIMENTAL_QUERY_CONCURRENCY}\`` - ) -} - -const createBaseOptions = (): Pick< - BetterQueue.QueueOptions, - "concurrent" | "store" -> => { - return { - concurrent: Number(process.env.GATSBY_EXPERIMENTAL_QUERY_CONCURRENCY) || 4, - store: memoryStoreWithPriorityBuckets(), - } -} - -const createBuildQueue = ( - graphqlRunner: GraphQLRunner, - runnerOptions: IGraphQLRunnerOptions = {} -): Queue => { - if (!graphqlRunner) { - graphqlRunner = new GraphQLRunner(store, runnerOptions) - } - - const queueOptions: BetterQueue.QueueOptions = { - ...createBaseOptions(), - async process({ job, activity }, callback): Promise { - try { - const result = await queryRunner(graphqlRunner, job, activity?.span) - callback(null, result) - } catch (e) { - callback(e) - } - }, - } - return new Queue(queueOptions) -} - -const createDevelopQueue = (getRunner: () => GraphQLRunner): Queue => { - const queueOptions: BetterQueue.QueueOptions = { - ...createBaseOptions(), - priority: ({ job }, cb): void => { - if (job.id && websocketManager.activePaths.has(job.id)) { - cb(null, 10) - } else { - cb(null, 1) - } - }, - merge: ( - _oldTask: Task, - newTask: Task, - cb: (err?: unknown, newTask?: Task) => void - ): void => { - cb(null, newTask) - }, - async process({ job: queryJob, activity }, callback): Promise { - try { - const result = await queryRunner(getRunner(), queryJob, activity?.span) - if (!queryJob.isPage) { - websocketManager.emitStaticQueryData({ - result, - id: queryJob.hash, - }) - } - - callback(null, result) - } catch (e) { - callback(e) - } - }, - } - - return new Queue(queueOptions) -} - -const createAppropriateQueue = ( - graphqlRunner: GraphQLRunner, - runnerOptions: IGraphQLRunnerOptions = {} -): Queue => { - if (process.env.NODE_ENV === `production`) { - return createBuildQueue(graphqlRunner, runnerOptions) - } - if (!graphqlRunner) { - graphqlRunner = new GraphQLRunner(store, runnerOptions) - } - return createDevelopQueue(() => graphqlRunner) -} - -/** - * Returns a promise that pushes jobs onto queue and resolves onces - * they're all finished processing (or rejects if one or more jobs - * fail) - * Note: queue is reused in develop so make sure to thoroughly cleanup hooks - */ -const processBatch = async ( - queue: Queue, - jobs: Array, - activity: ProgressActivityTracker -): Promise => { - if (jobs.length === 0) { - return Promise.resolve() - } - - return new Promise((resolve, reject) => { - let taskFinishCallback - - const gc = (): void => { - // eslint-disable-next-line @typescript-eslint/no-use-before-define - queue.off(`task_failed`, taskFailedCallback) - // eslint-disable-next-line @typescript-eslint/no-use-before-define - queue.off(`drain`, drainCallback) - if (taskFinishCallback) { - queue.off(`task_finish`, taskFinishCallback) - } - // We don't want to allow the variable to be null any other time, - // just when marking it as eligible for garbage collection. - // @ts-ignore - queue = null - } - - if (activity.tick) { - taskFinishCallback = (): unknown => activity.tick() - queue.on(`task_finish`, taskFinishCallback) - } - - const taskFailedCallback = (...err: Array): void => { - gc() - reject(err) - } - - const drainCallback = (): void => { - gc() - resolve() - } - - queue - // Note: the first arg is the path, the second the error - .on(`task_failed`, taskFailedCallback) - // Note: `drain` fires when all tasks _finish_ - // `empty` fires when queue is empty (but tasks are still running) - .on(`drain`, drainCallback) - - jobs.forEach(job => - queue.push({ - job, - activity, - }) - ) - }) -} - -export { - createBuildQueue, - createDevelopQueue, - processBatch, - createAppropriateQueue, -}