-
Notifications
You must be signed in to change notification settings - Fork 226
feat: implement worker-based graph composition with Tinypool #2637
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
StarpTech
merged 26 commits into
main
from
dustin/eng-8238-mitigate-event-loop-block-in-cosmo-controlplane
Mar 16, 2026
Merged
Changes from all commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
0fbb8b0
feat: implement worker-based graph composition with Tinypool
StarpTech b2158ba
fix: remove unnecessary execArgv from worker configuration
StarpTech 73d6a12
feat: add configuration for composition maxThreads in build server
StarpTech 1528ac9
fix: update fieldConfigurations in serialized graph artifact handling
StarpTech b60334a
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech ea5a0d2
fix: enhance router execution config handling in graph composition
StarpTech 165ab80
fix: make composition optional and handle maxThreads default in build…
StarpTech 7eef5a2
fix: remove unused buildRouterExecutionConfig and handle router execu…
StarpTech db37952
fix: replace ComposedSubgraph with CompositionSubgraphRecord for impr…
StarpTech bd075ea
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech c5c1280
refactor: migrate to composeGraphsInWorker for improved graph composi…
StarpTech cd3c488
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech 4e29b34
refactor: format code for improved readability in graph composition f…
StarpTech 5a2301f
refactor: clean up unused types and improve router execution config h…
StarpTech 27e5d4c
refactor: improve readability of shouldIncludeClientSchema assignment…
StarpTech bfb8ebc
refactor: update subgraphsToCompose type definition for clarity
StarpTech 7877af7
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech acdc770
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech e5f9f7f
refactor: move validateRouterCompatibilityVersion function to composi…
StarpTech a9487d5
refactor: streamline router compatibility version handling and remove…
StarpTech 0fc3f34
refactor: remove unused import of FieldConfiguration from composition…
StarpTech 077dce8
refactor: enhance router compatibility version handling in composeGra…
StarpTech e8fede8
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech d650fa1
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech c46e6c8
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech 3138b4a
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
132 changes: 132 additions & 0 deletions
132
controlplane/src/core/composition/composeGraphs.pool.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| /** | ||
| * Main-thread bridge for composition worker execution. | ||
| * | ||
| * The worker only exchanges plain `Serialized*` payloads so we do not rely on | ||
| * structured cloning of rich runtime objects across the Tinypool boundary. | ||
| * Node 22 loads the source `.ts` worker natively in development, and the built | ||
| * `.js` worker in production. | ||
| */ | ||
| import { existsSync } from 'node:fs'; | ||
| import { fileURLToPath } from 'node:url'; | ||
| import { availableParallelism } from 'node:os'; | ||
| import { Warning } from '@wundergraph/composition'; | ||
| import { RouterConfig } from '@wundergraph/cosmo-connect/dist/node/v1/node_pb'; | ||
| import WorkerPool from 'tinypool'; | ||
| import { FederatedGraphDTO } from '../../types/index.js'; | ||
| import { validateRouterCompatibilityVersion } from './composition.js'; | ||
| import { ComposedFederatedGraph, CompositionSubgraphRecord } from './composer.js'; | ||
| import { | ||
| ComposeGraphsTaskInput, | ||
| ComposeGraphsTaskResult, | ||
| SerializedComposedGraphArtifact, | ||
| } from './composeGraphs.types.js'; | ||
|
|
||
| let composeGraphsPool: WorkerPool | undefined; | ||
| const composeGraphsPoolConfig = { | ||
| maxThreads: 0, | ||
| }; | ||
|
|
||
| export interface ConfigureComposeGraphsPoolOptions { | ||
| maxThreads: number; | ||
| } | ||
|
|
||
| function getWorkerFilename() { | ||
| const sourceWorker = new URL('composeGraphs.worker.ts', import.meta.url); | ||
| if (existsSync(fileURLToPath(sourceWorker))) { | ||
| return { | ||
| filename: sourceWorker.href, | ||
| }; | ||
| } | ||
|
|
||
| return { | ||
| filename: new URL('composeGraphs.worker.js', import.meta.url).href, | ||
| }; | ||
| } | ||
|
|
||
| function getMaxThreads() { | ||
| if (composeGraphsPoolConfig.maxThreads > 0) { | ||
| return composeGraphsPoolConfig.maxThreads; | ||
| } | ||
|
|
||
| return Math.max(1, availableParallelism()); | ||
| } | ||
|
|
||
| function getComposeGraphsPool() { | ||
| if (composeGraphsPool) { | ||
| return composeGraphsPool; | ||
| } | ||
|
|
||
| const { filename } = getWorkerFilename(); | ||
|
|
||
| composeGraphsPool = new WorkerPool({ | ||
| filename, | ||
| minThreads: 1, | ||
| maxThreads: getMaxThreads(), | ||
| concurrentTasksPerWorker: 2, | ||
| }); | ||
|
|
||
| return composeGraphsPool; | ||
| } | ||
|
|
||
| function deserializeWarning(message: string, subgraphName?: string) { | ||
| return new Warning({ | ||
| message, | ||
| subgraph: { | ||
| name: subgraphName || '', | ||
StarpTech marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| }, | ||
| }); | ||
| } | ||
|
|
||
| export type DeserializedComposedGraph = Omit<ComposedFederatedGraph, 'subgraphs'> & { | ||
| subgraphs: CompositionSubgraphRecord[]; | ||
| }; | ||
|
|
||
| export function deserializeComposedGraphArtifact( | ||
| federatedGraph: Pick<FederatedGraphDTO, 'id' | 'targetId' | 'name' | 'namespace' | 'namespaceId'>, | ||
| artifact: SerializedComposedGraphArtifact, | ||
| ): DeserializedComposedGraph { | ||
| return { | ||
| id: federatedGraph.id, | ||
| targetID: federatedGraph.targetId, | ||
| name: federatedGraph.name, | ||
| namespace: federatedGraph.namespace, | ||
| namespaceId: federatedGraph.namespaceId, | ||
| composedSchema: artifact.composedSchema, | ||
| federatedClientSchema: artifact.federatedClientSchema, | ||
| shouldIncludeClientSchema: artifact.shouldIncludeClientSchema, | ||
| errors: artifact.errors.map((message) => new Error(message)), | ||
| fieldConfigurations: artifact.fieldConfigurations, | ||
| subgraphs: artifact.subgraphs, | ||
| warnings: artifact.warnings.map((warning) => deserializeWarning(warning.message, warning.subgraphName)), | ||
| }; | ||
StarpTech marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| export function deserializeRouterExecutionConfig(routerExecutionConfigJson?: ReturnType<RouterConfig['toJson']>) { | ||
| if (!routerExecutionConfigJson) { | ||
| return; | ||
| } | ||
|
|
||
| return RouterConfig.fromJson(routerExecutionConfigJson); | ||
| } | ||
|
|
||
| export function composeGraphsInWorker(task: Omit<ComposeGraphsTaskInput, 'routerCompatibilityVersion'>) { | ||
| const fullTask: ComposeGraphsTaskInput = { | ||
| ...task, | ||
| routerCompatibilityVersion: validateRouterCompatibilityVersion(task.federatedGraph.routerCompatibilityVersion), | ||
| }; | ||
| return getComposeGraphsPool().run(fullTask) as Promise<ComposeGraphsTaskResult>; | ||
| } | ||
|
|
||
| export function configureComposeGraphsPool(options: ConfigureComposeGraphsPoolOptions) { | ||
| composeGraphsPoolConfig.maxThreads = options.maxThreads; | ||
| } | ||
|
|
||
| export async function destroyComposeGraphsPool() { | ||
| if (!composeGraphsPool) { | ||
| return; | ||
| } | ||
|
|
||
| const pool = composeGraphsPool; | ||
| composeGraphsPool = undefined; | ||
| await pool.destroy(); | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| /** | ||
| * These types define the thread boundary for compose-and-deploy work. | ||
| * | ||
| * The `Serialized*` prefix is intentional: these payloads are flattened to | ||
| * structured-clone-safe data before crossing the Tinypool worker boundary. | ||
| * Rich runtime objects such as GraphQL schema instances, Maps, protobuf | ||
| * classes, and custom Error/Warning instances are reconstructed outside the | ||
| * worker when needed. | ||
| */ | ||
| import type { | ||
| CompositionOptions, | ||
| FieldConfiguration, | ||
| SupportedRouterCompatibilityVersion, | ||
| } from '@wundergraph/composition'; | ||
| import { RouterConfig } from '@wundergraph/cosmo-connect/dist/node/v1/node_pb'; | ||
| import { FederatedGraphDTO, SubgraphDTO } from '../../types/index.js'; | ||
|
|
||
| export interface SerializedContractTagOptions { | ||
| contractName: string; | ||
| excludeTags: string[]; | ||
StarpTech marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| includeTags: string[]; | ||
| } | ||
|
|
||
| export interface SerializedCompositionWarning { | ||
| message: string; | ||
| subgraphName?: string; | ||
| } | ||
|
|
||
| export interface SerializedComposedSubgraph { | ||
| id: string; | ||
| isFeatureSubgraph: boolean; | ||
| name: string; | ||
| sdl: string; | ||
| schemaVersionId: string; | ||
| targetId: string; | ||
| } | ||
|
|
||
| export interface SerializedComposedGraphArtifact { | ||
| success: boolean; | ||
| errors: string[]; | ||
| warnings: SerializedCompositionWarning[]; | ||
| composedSchema?: string; | ||
| federatedClientSchema?: string; | ||
| shouldIncludeClientSchema: boolean; | ||
| fieldConfigurations: FieldConfiguration[]; | ||
| subgraphs: SerializedComposedSubgraph[]; | ||
| routerExecutionConfigJson?: ReturnType<RouterConfig['toJson']>; | ||
| } | ||
|
|
||
| export interface SerializedContractCompositionArtifact { | ||
| contractName: string; | ||
| artifact: SerializedComposedGraphArtifact; | ||
| } | ||
|
|
||
| export interface ComposeGraphsTaskInput { | ||
| federatedGraph: FederatedGraphDTO; | ||
| /** Pre-validated on the main thread before dispatching to the worker. */ | ||
| routerCompatibilityVersion: SupportedRouterCompatibilityVersion; | ||
| subgraphsToCompose: { | ||
| subgraphs: SubgraphDTO[]; | ||
| isFeatureFlagComposition: boolean; | ||
| featureFlagName: string; | ||
| featureFlagId: string; | ||
| }[]; | ||
| tagOptionsByContractName: SerializedContractTagOptions[]; | ||
| compositionOptions?: CompositionOptions; | ||
| skipRouterConfig?: boolean; | ||
| } | ||
|
|
||
| export interface ComposeGraphsTaskResultItem { | ||
| isFeatureFlagComposition: boolean; | ||
| featureFlagName: string; | ||
| featureFlagId: string; | ||
| base: SerializedComposedGraphArtifact; | ||
| contracts: SerializedContractCompositionArtifact[]; | ||
| } | ||
|
|
||
| export interface ComposeGraphsTaskResult { | ||
| results: ComposeGraphsTaskResultItem[]; | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.