Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0fbb8b0
feat: implement worker-based graph composition with Tinypool
StarpTech Mar 11, 2026
b2158ba
fix: remove unnecessary execArgv from worker configuration
StarpTech Mar 11, 2026
73d6a12
feat: add configuration for composition maxThreads in build server
StarpTech Mar 12, 2026
1528ac9
fix: update fieldConfigurations in serialized graph artifact handling
StarpTech Mar 12, 2026
b60334a
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech Mar 12, 2026
ea5a0d2
fix: enhance router execution config handling in graph composition
StarpTech Mar 12, 2026
165ab80
fix: make composition optional and handle maxThreads default in build…
StarpTech Mar 12, 2026
7eef5a2
fix: remove unused buildRouterExecutionConfig and handle router execu…
StarpTech Mar 12, 2026
db37952
fix: replace ComposedSubgraph with CompositionSubgraphRecord for impr…
StarpTech Mar 12, 2026
bd075ea
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech Mar 12, 2026
c5c1280
refactor: migrate to composeGraphsInWorker for improved graph composi…
StarpTech Mar 12, 2026
cd3c488
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech Mar 12, 2026
4e29b34
refactor: format code for improved readability in graph composition f…
StarpTech Mar 12, 2026
5a2301f
refactor: clean up unused types and improve router execution config h…
StarpTech Mar 12, 2026
27e5d4c
refactor: improve readability of shouldIncludeClientSchema assignment…
StarpTech Mar 12, 2026
bfb8ebc
refactor: update subgraphsToCompose type definition for clarity
StarpTech Mar 12, 2026
7877af7
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech Mar 12, 2026
acdc770
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech Mar 12, 2026
e5f9f7f
refactor: move validateRouterCompatibilityVersion function to composi…
StarpTech Mar 12, 2026
a9487d5
refactor: streamline router compatibility version handling and remove…
StarpTech Mar 12, 2026
0fc3f34
refactor: remove unused import of FieldConfiguration from composition…
StarpTech Mar 12, 2026
077dce8
refactor: enhance router compatibility version handling in composeGra…
StarpTech Mar 13, 2026
e8fede8
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech Mar 13, 2026
d650fa1
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech Mar 13, 2026
c46e6c8
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech Mar 14, 2026
3138b4a
Merge branch 'main' into dustin/eng-8238-mitigate-event-loop-block-in…
StarpTech Mar 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions controlplane/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
"stream-json": "^1.8.0",
"stripe": "^14.19.0",
"tiny-lru": "^11.2.11",
"tinypool": "^2.1.0",
"uid": "^2.0.2",
"uuid": "^10.0.0",
"zod": "^3.25.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import {
CompositionWarning,
Subgraph,
} from '@wundergraph/cosmo-connect/dist/platform/v1/platform_pb';
import { parse } from 'graphql';
import { COMPOSITION_IGNORE_EXTERNAL_KEYS_FEATURE_ID } from '../../../types/index.js';
import { composeSubgraphs } from '../../composition/composition.js';
import { composeGraphsInWorker } from '../../composition/composeGraphs.pool.js';
import { FederatedGraphRepository } from '../../repositories/FederatedGraphRepository.js';
import { DefaultNamespace } from '../../repositories/NamespaceRepository.js';
import { OrganizationRepository } from '../../repositories/OrganizationRepository.js';
Expand Down Expand Up @@ -108,19 +107,25 @@ export function checkFederatedGraph(
featureId: COMPOSITION_IGNORE_EXTERNAL_KEYS_FEATURE_ID,
});

const result = composeSubgraphs(
subgraphsUsedForComposition.map((s) => ({
id: s.id,
name: s.name,
url: s.routingUrl,
definitions: parse(s.schemaSDL),
})),
federatedGraph.routerCompatibilityVersion,
{
const { results } = await composeGraphsInWorker({
federatedGraph,
subgraphsToCompose: [
{
subgraphs: subgraphsUsedForComposition,
isFeatureFlagComposition: false,
featureFlagName: '',
featureFlagId: '',
},
],
tagOptionsByContractName: [],
compositionOptions: {
disableResolvabilityValidation: req.disableResolvabilityValidation,
ignoreExternalKeys: ignoreExternalKeysFeature?.enabled ?? false,
},
);
skipRouterConfig: true,
});

const compositionResult = results[0].base;

// If req.limit is not provided, we return all rows
const returnLimit = req.limit === undefined ? null : clamp(req.limit, 1, maxRowLimitForChecks);
Expand All @@ -138,9 +143,9 @@ export function checkFederatedGraph(
};

const compositionWarnings: PlainMessage<CompositionWarning>[] = [];
counts.compositionWarnings = result.warnings.length;
counts.compositionWarnings = compositionResult.warnings.length;

const clampedWarnings = returnLimit ? result.warnings.slice(0, returnLimit) : result.warnings;
const clampedWarnings = returnLimit ? compositionResult.warnings.slice(0, returnLimit) : compositionResult.warnings;
for (const warning of clampedWarnings) {
compositionWarnings.push({
message: warning.message,
Expand All @@ -150,14 +155,14 @@ export function checkFederatedGraph(
});
}

if (!result.success) {
if (!compositionResult.success) {
const compositionErrors: PlainMessage<CompositionError>[] = [];
counts.compositionErrors = result.errors.length;
counts.compositionErrors = compositionResult.errors.length;

const clampedErrors = returnLimit ? result.errors.slice(0, returnLimit) : result.errors;
const clampedErrors = returnLimit ? compositionResult.errors.slice(0, returnLimit) : compositionResult.errors;
for (const error of clampedErrors) {
compositionErrors.push({
message: error.message,
message: error,
federatedGraphName: req.name,
namespace: federatedGraph.namespace,
featureFlag: '',
Expand Down
14 changes: 14 additions & 0 deletions controlplane/src/core/build-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@ import {
createReactivateOrganizationWorker,
ReactivateOrganizationQueue,
} from './workers/ReactivateOrganizationWorker.js';
import { configureComposeGraphsPool, destroyComposeGraphsPool } from './composition/composeGraphs.pool.js';

export interface BuildConfig {
logger: LoggerOptions;
composition?: {
maxThreads: number;
};
database: {
url: string;
tls?: {
Expand Down Expand Up @@ -157,6 +161,10 @@ const developmentLoggerOpts: LoggerOptions = {
};

export default async function build(opts: BuildConfig) {
configureComposeGraphsPool({
maxThreads: opts.composition?.maxThreads ?? 0,
});

opts.logger = {
timestamp: stdTimeFunctions.isoTime,
formatters: {
Expand Down Expand Up @@ -535,6 +543,12 @@ export default async function build(opts: BuildConfig) {
await Promise.all(bullWorkers.map((worker) => worker.close()));

fastify.log.debug('Bull workers shut down');

fastify.log.debug('Shutting down composition worker pool');

await destroyComposeGraphsPool();

fastify.log.debug('Composition worker pool shut down');
});

return fastify;
Expand Down
132 changes: 132 additions & 0 deletions controlplane/src/core/composition/composeGraphs.pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/**
* Main-thread bridge for composition worker execution.
*
* The worker only exchanges plain `Serialized*` payloads so we do not rely on
* structured cloning of rich runtime objects across the Tinypool boundary.
* Node 22 loads the source `.ts` worker natively in development, and the built
* `.js` worker in production.
*/
import { existsSync } from 'node:fs';
import { fileURLToPath } from 'node:url';
import { availableParallelism } from 'node:os';
import { Warning } from '@wundergraph/composition';
import { RouterConfig } from '@wundergraph/cosmo-connect/dist/node/v1/node_pb';
import WorkerPool from 'tinypool';
import { FederatedGraphDTO } from '../../types/index.js';
import { validateRouterCompatibilityVersion } from './composition.js';
import { ComposedFederatedGraph, CompositionSubgraphRecord } from './composer.js';
import {
ComposeGraphsTaskInput,
ComposeGraphsTaskResult,
SerializedComposedGraphArtifact,
} from './composeGraphs.types.js';

let composeGraphsPool: WorkerPool | undefined;
const composeGraphsPoolConfig = {
maxThreads: 0,
};

export interface ConfigureComposeGraphsPoolOptions {
maxThreads: number;
}

function getWorkerFilename() {
const sourceWorker = new URL('composeGraphs.worker.ts', import.meta.url);
if (existsSync(fileURLToPath(sourceWorker))) {
return {
filename: sourceWorker.href,
};
}

return {
filename: new URL('composeGraphs.worker.js', import.meta.url).href,
};
}

function getMaxThreads() {
if (composeGraphsPoolConfig.maxThreads > 0) {
return composeGraphsPoolConfig.maxThreads;
}

return Math.max(1, availableParallelism());
}

function getComposeGraphsPool() {
if (composeGraphsPool) {
return composeGraphsPool;
}

const { filename } = getWorkerFilename();

composeGraphsPool = new WorkerPool({
filename,
minThreads: 1,
maxThreads: getMaxThreads(),
concurrentTasksPerWorker: 2,
});

return composeGraphsPool;
}

function deserializeWarning(message: string, subgraphName?: string) {
return new Warning({
message,
subgraph: {
name: subgraphName || '',
},
});
}

export type DeserializedComposedGraph = Omit<ComposedFederatedGraph, 'subgraphs'> & {
subgraphs: CompositionSubgraphRecord[];
};

export function deserializeComposedGraphArtifact(
federatedGraph: Pick<FederatedGraphDTO, 'id' | 'targetId' | 'name' | 'namespace' | 'namespaceId'>,
artifact: SerializedComposedGraphArtifact,
): DeserializedComposedGraph {
return {
id: federatedGraph.id,
targetID: federatedGraph.targetId,
name: federatedGraph.name,
namespace: federatedGraph.namespace,
namespaceId: federatedGraph.namespaceId,
composedSchema: artifact.composedSchema,
federatedClientSchema: artifact.federatedClientSchema,
shouldIncludeClientSchema: artifact.shouldIncludeClientSchema,
errors: artifact.errors.map((message) => new Error(message)),
fieldConfigurations: artifact.fieldConfigurations,
subgraphs: artifact.subgraphs,
warnings: artifact.warnings.map((warning) => deserializeWarning(warning.message, warning.subgraphName)),
};
}

export function deserializeRouterExecutionConfig(routerExecutionConfigJson?: ReturnType<RouterConfig['toJson']>) {
if (!routerExecutionConfigJson) {
return;
}

return RouterConfig.fromJson(routerExecutionConfigJson);
}

export function composeGraphsInWorker(task: Omit<ComposeGraphsTaskInput, 'routerCompatibilityVersion'>) {
const fullTask: ComposeGraphsTaskInput = {
...task,
routerCompatibilityVersion: validateRouterCompatibilityVersion(task.federatedGraph.routerCompatibilityVersion),
};
return getComposeGraphsPool().run(fullTask) as Promise<ComposeGraphsTaskResult>;
}

export function configureComposeGraphsPool(options: ConfigureComposeGraphsPoolOptions) {
composeGraphsPoolConfig.maxThreads = options.maxThreads;
}

export async function destroyComposeGraphsPool() {
if (!composeGraphsPool) {
return;
}

const pool = composeGraphsPool;
composeGraphsPool = undefined;
await pool.destroy();
}
80 changes: 80 additions & 0 deletions controlplane/src/core/composition/composeGraphs.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* These types define the thread boundary for compose-and-deploy work.
*
* The `Serialized*` prefix is intentional: these payloads are flattened to
* structured-clone-safe data before crossing the Tinypool worker boundary.
* Rich runtime objects such as GraphQL schema instances, Maps, protobuf
* classes, and custom Error/Warning instances are reconstructed outside the
* worker when needed.
*/
import type {
CompositionOptions,
FieldConfiguration,
SupportedRouterCompatibilityVersion,
} from '@wundergraph/composition';
import { RouterConfig } from '@wundergraph/cosmo-connect/dist/node/v1/node_pb';
import { FederatedGraphDTO, SubgraphDTO } from '../../types/index.js';

export interface SerializedContractTagOptions {
contractName: string;
excludeTags: string[];
includeTags: string[];
}

export interface SerializedCompositionWarning {
message: string;
subgraphName?: string;
}

export interface SerializedComposedSubgraph {
id: string;
isFeatureSubgraph: boolean;
name: string;
sdl: string;
schemaVersionId: string;
targetId: string;
}

export interface SerializedComposedGraphArtifact {
success: boolean;
errors: string[];
warnings: SerializedCompositionWarning[];
composedSchema?: string;
federatedClientSchema?: string;
shouldIncludeClientSchema: boolean;
fieldConfigurations: FieldConfiguration[];
subgraphs: SerializedComposedSubgraph[];
routerExecutionConfigJson?: ReturnType<RouterConfig['toJson']>;
}

export interface SerializedContractCompositionArtifact {
contractName: string;
artifact: SerializedComposedGraphArtifact;
}

export interface ComposeGraphsTaskInput {
federatedGraph: FederatedGraphDTO;
/** Pre-validated on the main thread before dispatching to the worker. */
routerCompatibilityVersion: SupportedRouterCompatibilityVersion;
subgraphsToCompose: {
subgraphs: SubgraphDTO[];
isFeatureFlagComposition: boolean;
featureFlagName: string;
featureFlagId: string;
}[];
tagOptionsByContractName: SerializedContractTagOptions[];
compositionOptions?: CompositionOptions;
skipRouterConfig?: boolean;
}

export interface ComposeGraphsTaskResultItem {
isFeatureFlagComposition: boolean;
featureFlagName: string;
featureFlagId: string;
base: SerializedComposedGraphArtifact;
contracts: SerializedContractCompositionArtifact[];
}

export interface ComposeGraphsTaskResult {
results: ComposeGraphsTaskResultItem[];
}
Loading
Loading