Skip to content

Commit

Permalink
refactor(gatsby): Make query queue constructable (#13061)
Browse files Browse the repository at this point in the history
## Description

#13004 requires that page queries be executed _after_ the javascript build completes. To get there, we need to be able to process a batch of queries. The current code relies on a global query queue, and uses complex global events to ensure it's paused/resumed during bootstrap. This PR makes the queue constructable so that a batch of queries can be processed easily. 

I also noticed that `develop` queue is much more complex that the `build` queue, so I've separated those into two queue constructors.

## Related Issues

- Sub-PR of #13004
- builds on #13016
  • Loading branch information
Moocar authored and wardpeet committed Apr 10, 2019
1 parent 9a75aff commit 3ca49f2
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 215 deletions.
17 changes: 1 addition & 16 deletions packages/gatsby/src/bootstrap/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ const md5File = require(`md5-file/promise`)
const crypto = require(`crypto`)
const del = require(`del`)
const path = require(`path`)
const convertHrtime = require(`convert-hrtime`)
const Promise = require(`bluebird`)
const telemetry = require(`gatsby-telemetry`)

Expand All @@ -33,7 +32,6 @@ process.on(`unhandledRejection`, (reason, p) => {

const { extractQueries } = require(`../query/query-watcher`)
const { runInitialQueries } = require(`../query/page-query-runner`)
const queryQueue = require(`../query/query-queue`)
const { writePages } = require(`../query/pages-writer`)
const { writeRedirects } = require(`./redirects-writer`)

Expand Down Expand Up @@ -459,20 +457,7 @@ module.exports = async (args: BootstrapArgs) => {
parentSpan: bootstrapSpan,
})
activity.start()
const startQueries = process.hrtime()
queryQueue.on(`task_finish`, () => {
const stats = queryQueue.getStats()
activity.setStatus(
`${stats.total}/${stats.peak} ${(
stats.total / convertHrtime(process.hrtime(startQueries)).seconds
).toFixed(2)} queries/second`
)
})
// HACKY!!! TODO: REMOVE IN NEXT REFACTOR
emitter.emit(`START_QUERY_QUEUE`)
// END HACKY
runInitialQueries(activity)
await new Promise(resolve => queryQueue.on(`drain`, resolve))
await runInitialQueries(activity)
activity.end()

require(`../redux/actions`).boundActionCreators.setProgramStatus(
Expand Down
3 changes: 3 additions & 0 deletions packages/gatsby/src/commands/develop.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const db = require(`../db`)
const telemetry = require(`gatsby-telemetry`)
const detectPortInUseAndPrompt = require(`../utils/detect-port-in-use-and-prompt`)
const onExit = require(`signal-exit`)
const pageQueryRunner = require(`../query/page-query-runner`)
const queryQueue = require(`../query/queue`)
const queryWatcher = require(`../query/query-watcher`)

// const isInteractive = process.stdout.isTTY
Expand Down Expand Up @@ -90,6 +92,7 @@ async function startServer(program) {
await bootstrap(program)

db.startAutosave()
pageQueryRunner.startListening(queryQueue.makeDevelop())
queryWatcher.startWatchDeletePage()

await createIndexHtml()
Expand Down
151 changes: 80 additions & 71 deletions packages/gatsby/src/query/page-query-runner.js
Original file line number Diff line number Diff line change
@@ -1,44 +1,20 @@
// @flow

import type { QueryJob } from "../query-runner"

/**
* Jobs of this module
* - Ensure on bootstrap that all invalid page queries are run and report
* when this is done
* - Watch for when a page's query is invalidated and re-run it.
*/

const _ = require(`lodash`)

const queue = require(`./query-queue`)
const Queue = require(`better-queue`)
const convertHrtime = require(`convert-hrtime`)
const { store, emitter } = require(`../redux`)
const queryQueue = require(`./queue`)

let queuedDirtyActions = []

let active = false
let running = false

const runQueriesForPathnamesQueue = new Set()
exports.queueQueryForPathname = pathname => {
const queueQueryForPathname = pathname => {
runQueriesForPathnamesQueue.add(pathname)
}

// Do initial run of graphql queries during bootstrap.
// Afterwards we listen "API_RUNNING_QUEUE_EMPTY" and check
// for dirty nodes before running queries.
exports.runInitialQueries = async () => {
active = true
await runQueries(true)
return
}

const runQueries = async (initial = false) => {
// Don't run queries until bootstrap gets to "run graphql queries"
if (!active) {
return
}

const calcQueries = (initial = false) => {
// Find paths dependent on dirty nodes
queuedDirtyActions = _.uniq(queuedDirtyActions, a => a.payload.id)
const dirtyIds = findDirtyIds(queuedDirtyActions)
Expand Down Expand Up @@ -71,13 +47,9 @@ const runQueries = async (initial = false) => {

runQueriesForPathnamesQueue.clear()

// Run these paths
await runQueriesForPathnames(pathnamesToRun)
return
return pathnamesToRun
}

exports.runQueries = runQueries

emitter.on(`CREATE_NODE`, action => {
queuedDirtyActions.push(action)
})
Expand All @@ -86,26 +58,6 @@ emitter.on(`DELETE_NODE`, action => {
queuedDirtyActions.push({ payload: action.payload })
})

const runQueuedActions = async () => {
if (active && !running) {
try {
running = true
await runQueries()
} finally {
running = false
if (queuedDirtyActions.length > 0) {
runQueuedActions()
}
}
}
}
exports.runQueuedActions = runQueuedActions

// Wait until all plugins have finished running (e.g. various
// transformer plugins) before running queries so we don't
// query things in a 1/2 finished state.
emitter.on(`API_RUNNING_QUEUE_EMPTY`, runQueuedActions)

let seenIdsWithoutDataDependencies = []

// Remove pages from seenIdsWithoutDataDependencies when they're deleted
Expand Down Expand Up @@ -147,10 +99,11 @@ const findIdsWithoutDataDependencies = () => {
return notTrackedIds
}

const runQueriesForPathnames = pathnames => {
const makeQueryJobs = pathnames => {
const staticQueries = pathnames.filter(p => p.slice(0, 4) === `sq--`)
const pageQueries = pathnames.filter(p => p.slice(0, 4) !== `sq--`)
const state = store.getState()
const queryJobs = []

staticQueries.forEach(id => {
const staticQueryComponent = store.getState().staticQueryComponents.get(id)
Expand All @@ -162,16 +115,14 @@ const runQueriesForPathnames = pathnames => {
componentPath: staticQueryComponent.componentPath,
context: { path: staticQueryComponent.jsonName },
}
queue.push(queryJob)
queryJobs.push(queryJob)
})

const pages = state.pages
let didNotQueueItems = true
pageQueries.forEach(id => {
const page = pages.get(id)
if (page) {
didNotQueueItems = false
queue.push(
queryJobs.push(
({
id: page.path,
jsonName: page.jsonName,
Expand All @@ -186,18 +137,7 @@ const runQueriesForPathnames = pathnames => {
)
}
})

if (didNotQueueItems || !pathnames || pathnames.length === 0) {
return Promise.resolve()
}

return new Promise(resolve => {
const onDrain = () => {
queue.removeListener(`drain`, onDrain)
resolve()
}
queue.on(`drain`, onDrain)
})
return queryJobs
}

const findDirtyIds = actions => {
Expand All @@ -221,3 +161,72 @@ const findDirtyIds = actions => {
)
return uniqDirties
}

const runInitialQueries = async activity => {
const pathnamesToRun = calcQueries(true)
if (pathnamesToRun.length === 0) {
return
}

const queryJobs = makeQueryJobs(pathnamesToRun)

const queue = queryQueue.makeBuild()

const startQueries = process.hrtime()
queue.on(`task_finish`, () => {
const stats = queue.getStats()
activity.setStatus(
`${stats.total}/${stats.peak} ${(
stats.total / convertHrtime(process.hrtime(startQueries)).seconds
).toFixed(2)} queries/second`
)
})
await queryQueue.processBatch(queue, queryJobs)
}

/////////////////////////////////////////////////////////////////////
// Listener for gatsby develop

// Initialized via `startListening`
let listenerQueue

/**
* Run any dirty queries. See `calcQueries` for what constitutes a
* dirty query
*/
const runQueuedQueries = () => {
if (listenerQueue) {
listenerQueue.push(makeQueryJobs(calcQueries(false)))
}
}

/**
* Starts a background process that processes any dirty queries
* whenever one of the following occurs:
*
* 1. A node has changed (but only after the api call has finished
* running)
* 2. A component query (e.g by editing a React Component) has
* changed
*
* For what constitutes a dirty query, see `calcQueries`
*/
const startListening = queue => {
// We use a queue to process batches of queries so that they are
// processed consecutively
listenerQueue = new Queue((queryJobs, callback) =>
queryQueue
.processBatch(queue, queryJobs)
.then(() => callback(null))
.catch(callback)
)

emitter.on(`API_RUNNING_QUEUE_EMPTY`, runQueuedQueries)
}

module.exports = {
runInitialQueries,
startListening,
runQueuedQueries,
queueQueryForPathname,
}
106 changes: 0 additions & 106 deletions packages/gatsby/src/query/query-queue.js

This file was deleted.

Loading

0 comments on commit 3ca49f2

Please sign in to comment.