Skip to content

Commit

Permalink
feat(gatsby): PQR worker can run page queries (#32017)
Browse files Browse the repository at this point in the history
* run queries in child/schema

* split schema into schema & queries, add new test

* wip

* test static queries

* revert changes related to page queries

* initial

* smaller ts improvements

removing unused types, consolidate them, export IQueryJob

* convert query/index.js to TS + change pageQueryIds type to Array<IGatsbyPage>

* wip

* use .cache/worker folder for slices location

* optional chaining, test context variables

* make cloud tests pass?

* add runQueriesInWorkers function

* remove runQueriesInWorkersQueue from build command for now

* add test for runQueriesInWorkersQueue

* adapt activity

* first part of review comments

* wip test

* finalize test

* proper test with jest-extended

Co-authored-by: pieh@users.noreply.github.com

* typescript magic

* revert createProgress changes
  • Loading branch information
LekoArts authored Jun 24, 2021
1 parent 069cb53 commit 44257b6
Show file tree
Hide file tree
Showing 17 changed files with 401 additions and 105 deletions.
1 change: 1 addition & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ module.exports = {
testEnvironment: `jest-environment-jsdom-fourteen`,
moduleFileExtensions: [`js`, `jsx`, `ts`, `tsx`, `json`],
setupFiles: [`<rootDir>/.jestSetup.js`],
setupFilesAfterEnv: [`jest-extended`],
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"ignore": "^5.1.8",
"jest": "^24.9.0",
"jest-cli": "^24.9.0",
"jest-extended": "^0.11.5",
"jest-environment-jsdom-fourteen": "^0.1.0",
"jest-junit": "^10.0.0",
"jest-serializer-path": "^0.1.15",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
const _ = require(`lodash`)
const fastq = require(`fastq`)
const { store } = require(`../redux`)
const { hasFlag, FLAG_ERROR_EXTRACTION } = require(`../redux/reducers/queries`)
const { queryRunner } = require(`./query-runner`)
const { websocketManager } = require(`../utils/websocket-manager`)
const { GraphQLRunner } = require(`./graphql-runner`)
import _ from "lodash"
import fastq from "fastq"
import { IProgressReporter } from "gatsby-cli/lib/reporter/reporter-progress"
import { store } from "../redux"
import { IGatsbyPage, IGatsbyState } from "../redux/types"
import { hasFlag, FLAG_ERROR_EXTRACTION } from "../redux/reducers/queries"
import { IQueryJob, queryRunner } from "./query-runner"
import {
IStaticQueryResult,
websocketManager,
} from "../utils/websocket-manager"
import { GraphQLRunner } from "./graphql-runner"
import { IGroupedQueryIds } from "../services"

if (process.env.GATSBY_EXPERIMENTAL_QUERY_CONCURRENCY) {
console.info(
Expand All @@ -21,7 +27,7 @@ const concurrency =
* Dirty state is tracked in `queries` reducer, here we simply filter
* them from all tracked queries.
*/
function calcDirtyQueryIds(state) {
export function calcDirtyQueryIds(state: IGatsbyState): Array<string> {
const { trackedQueries, trackedComponents, deletedQueries } = state.queries

const queriesWithBabelErrors = new Set()
Expand All @@ -33,7 +39,7 @@ function calcDirtyQueryIds(state) {
}
}
// Note: trackedQueries contains both - page and static query ids
const dirtyQueryIds = []
const dirtyQueryIds: Array<string> = []
for (const [queryId, query] of trackedQueries) {
if (deletedQueries.has(queryId)) {
continue
Expand All @@ -45,32 +51,49 @@ function calcDirtyQueryIds(state) {
return dirtyQueryIds
}

export { calcDirtyQueryIds as calcInitialDirtyQueryIds }

/**
* groups queryIds by whether they are static or page queries.
* Groups queryIds by whether they are static or page queries.
*/
function groupQueryIds(queryIds) {
export function groupQueryIds(queryIds: Array<string>): IGroupedQueryIds {
const grouped = _.groupBy(queryIds, p =>
p.slice(0, 4) === `sq--` ? `static` : `page`
)

const { pages } = store.getState()

return {
staticQueryIds: grouped.static || [],
pageQueryIds: grouped.page || [],
staticQueryIds: grouped?.static || [],
pageQueryIds:
grouped?.page
?.map(path => pages.get(path) as IGatsbyPage)
?.filter(Boolean) || [],
}
}

function createQueue({
function createQueue<QueryIDType>({
createJobFn,
state,
activity,
graphqlRunner,
graphqlTracing,
}) {
}: {
createJobFn: (
state: IGatsbyState,
queryId: QueryIDType
) => IQueryJob | undefined
state: IGatsbyState
activity: IProgressReporter
graphqlRunner: GraphQLRunner
graphqlTracing: boolean
}): fastq.queue<QueryIDType, any> {
if (!graphqlRunner) {
graphqlRunner = new GraphQLRunner(store, { graphqlTracing })
}
state = state || store.getState()

function worker(queryId, cb) {
function worker(queryId: QueryIDType, cb): void {
const job = createJobFn(state, queryId)
if (!job) {
cb(null, undefined)
Expand All @@ -91,15 +114,28 @@ function createQueue({
return fastq(worker, concurrency)
}

async function processQueries({
async function processQueries<QueryIDType>({
queryIds,
createJobFn,
onQueryDone,
state,
activity,
graphqlRunner,
graphqlTracing,
}) {
}: {
queryIds: Array<QueryIDType>
createJobFn: (
state: IGatsbyState,
queryId: QueryIDType
) => IQueryJob | undefined
onQueryDone:
| (({ job, result }: { job: IQueryJob; result: unknown }) => void)
| undefined
state: IGatsbyState
activity: IProgressReporter
graphqlRunner: GraphQLRunner
graphqlTracing: boolean
}): Promise<void> {
return new Promise((resolve, reject) => {
const fastQueue = createQueue({
createJobFn,
Expand All @@ -109,7 +145,7 @@ async function processQueries({
graphqlTracing,
})

queryIds.forEach(queryId => {
queryIds.forEach((queryId: QueryIDType) => {
fastQueue.push(queryId, (err, res) => {
if (err) {
fastQueue.kill()
Expand All @@ -123,40 +159,57 @@ async function processQueries({
})

if (!fastQueue.idle()) {
fastQueue.drain = () => resolve()
fastQueue.drain = (): any => resolve()
} else {
resolve()
}
})
}

function createStaticQueryJob(state, queryId) {
function createStaticQueryJob(
state: IGatsbyState,
queryId: string
): IQueryJob | undefined {
const component = state.staticQueryComponents.get(queryId)

if (!component) {
return undefined
}

const { hash, id, query, componentPath } = component

return {
id: queryId,
hash,
query,
isPage: false,
hash,
componentPath,
context: { path: id },
}
}

function onDevelopStaticQueryDone({ job, result }) {
function onDevelopStaticQueryDone({
job,
result,
}: {
job: IQueryJob
result: IStaticQueryResult["result"]
}): void {
if (!job.hash) {
return
}

websocketManager.emitStaticQueryData({
result,
id: job.hash,
})
}

async function processStaticQueries(
queryIds,
export async function processStaticQueries(
queryIds: IGroupedQueryIds["staticQueryIds"],
{ state, activity, graphqlRunner, graphqlTracing }
) {
return processQueries({
): Promise<void> {
return processQueries<string>({
queryIds,
createJobFn: createStaticQueryJob,
onQueryDone:
Expand All @@ -170,33 +223,34 @@ async function processStaticQueries(
})
}

async function processPageQueries(
queryIds,
export async function processPageQueries(
queryIds: IGroupedQueryIds["pageQueryIds"],
{ state, activity, graphqlRunner, graphqlTracing }
) {
return processQueries({
): Promise<void> {
return processQueries<IGatsbyPage>({
queryIds,
createJobFn: createPageQueryJob,
onQueryDone: undefined,
state,
activity,
graphqlRunner,
graphqlTracing,
})
}

function createPageQueryJob(state, queryId) {
const page = state.pages.get(queryId)
function createPageQueryJob(
state: IGatsbyState,
page: IGatsbyPage
): IQueryJob | undefined {
const component = state.components.get(page.componentPath)

// Make sure we filter out pages that don't exist. An example is
// /dev-404-page/, whose SitePage node is created via
// `internal-data-bridge`, but the actual page object is only
// created during `gatsby develop`.
if (!page) {
if (!component) {
return undefined
}
const component = state.components.get(page.componentPath)

const { path, componentPath, context } = page
const { query } = component

return {
id: path,
query,
Expand All @@ -208,11 +262,3 @@ function createPageQueryJob(state, queryId) {
},
}
}

module.exports = {
calcInitialDirtyQueryIds: calcDirtyQueryIds,
calcDirtyQueryIds,
processPageQueries,
processStaticQueries,
groupQueryIds,
}
4 changes: 2 additions & 2 deletions packages/gatsby/src/query/query-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import { pageDataExists } from "../utils/page-data"

const resultHashes = new Map()

interface IQueryJob {
export interface IQueryJob {
id: string
hash?: string
query: string
componentPath: string
context: PageContext
isPage: boolean
pluginCreatorId: string
pluginCreatorId?: string
}

function reportLongRunningQueryJob(queryJob): void {
Expand Down
2 changes: 1 addition & 1 deletion packages/gatsby/src/query/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export interface IGraphQLRunnerStatResults {
uniqueSorts: number
}

export type PageContext = any
export type PageContext = Record<string, any>

export interface IExecutionResult extends ExecutionResult {
pageContext?: PageContext
Expand Down
Loading

0 comments on commit 44257b6

Please sign in to comment.