Skip to content

feat: implement worker-based graph composition with Tinypool#2637

Merged
StarpTech merged 26 commits intomainfrom
dustin/eng-8238-mitigate-event-loop-block-in-cosmo-controlplane
Mar 16, 2026
Merged

feat: implement worker-based graph composition with Tinypool#2637
StarpTech merged 26 commits intomainfrom
dustin/eng-8238-mitigate-event-loop-block-in-cosmo-controlplane

Conversation

@StarpTech
Copy link
Copy Markdown
Contributor

@StarpTech StarpTech commented Mar 11, 2026

Summary by CodeRabbit

  • New Features

    • Composition now runs in parallel worker processes for faster, isolated federation and contract-aware composition.
    • New environment setting to control composition worker concurrency (COMPOSITION_MAX_THREADS).
  • Refactor

    • Composition pipeline reworked to produce flattened, worker-generated composition artifacts for simpler aggregation and clearer error/warning handling.
  • Chores

    • Managed lifecycle for composition workers (initialize on start, graceful shutdown) to reduce resource leaks and improve reliability.

Checklist

  • I have discussed my proposed changes in an issue and have received approval to proceed.
  • I have followed the coding standards of the project.
  • Tests or benchmarks have been added or updated.
  • Documentation has been updated on https://github.com/wundergraph/cosmo-docs.
  • I have read the Contributors Guide.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 11, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds a Tinypool-based worker pool and worker entry for federated graph composition, new serialized worker types, wiring into server lifecycle and repository composition flow, and an env/config flag plus dependency to control pool max threads. (≤50 words)

Changes

Cohort / File(s) Summary
Dependencies
controlplane/package.json
Added tinypool ^2.1.0 runtime dependency.
Worker Pool Bridge
controlplane/src/core/composition/composeGraphs.pool.ts
New main-thread pool module: resolves worker filename, lazy Tinypool singleton, task dispatch (composeGraphsInWorker), deserialization helpers, configureComposeGraphsPool and destroyComposeGraphsPool APIs.
Worker Implementation
controlplane/src/core/composition/composeGraphs.worker.ts
New worker entry that performs federation composition: input validation, composition orchestration (graph/contract), artifact serialization, and returns structured, clone-safe results.
Worker Types
controlplane/src/core/composition/composeGraphs.types.ts
New structured-clone-safe TypeScript interfaces for task inputs, serialized composed artifacts, warnings, subgraph metadata, per-contract artifacts, and task results.
Repository Integration
controlplane/src/core/repositories/FederatedGraphRepository.ts
Replaced legacy FederationResult-based composition with composeGraphsInWorker flow; updated mapping/deserialize helpers, error/warning handling, per-contract and feature-flag composition handling.
Server Lifecycle & Config
controlplane/src/core/build-server.ts, controlplane/src/index.ts, controlplane/src/core/env.schema.ts
Added COMPOSITION_MAX_THREADS env var and composition.maxThreads BuildConfig; initialize pool on startup via configureComposeGraphsPool and call destroyComposeGraphsPool on shutdown.
Other composition files
controlplane/src/core/composition/*
New composition-related modules/types grouped under composition/ (worker, pool, types) implementing worker-based composition pipeline.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 5.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main feature: implementing worker-based graph composition using Tinypool, which is reflected across multiple new files (composeGraphs.pool.ts, composeGraphs.worker.ts, composeGraphs.types.ts) and integration points.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

You can validate your CodeRabbit configuration file in your editor.

If your editor has YAML language server, you can enable auto-completion and validation by adding # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json at the top of your CodeRabbit configuration file.

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 11, 2026

Codecov Report

❌ Patch coverage is 43.72385% with 269 lines in your changes missing coverage. Please review.
✅ Project coverage is 46.42%. Comparing base (a087898) to head (3138b4a).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
...plane/src/core/composition/composeGraphs.worker.ts 0.00% 186 Missing and 1 partial ⚠️
controlplane/src/core/composition/composer.ts 56.00% 33 Missing ⚠️
.../src/core/repositories/FederatedGraphRepository.ts 73.68% 25 Missing ⚠️
...olplane/src/core/composition/composeGraphs.pool.ts 81.39% 16 Missing ⚠️
controlplane/src/index.ts 0.00% 4 Missing ⚠️
controlplane/src/core/build-server.ts 50.00% 3 Missing ⚠️
controlplane/src/core/env.schema.ts 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2637      +/-   ##
==========================================
- Coverage   46.52%   46.42%   -0.10%     
==========================================
  Files        1043     1046       +3     
  Lines      141356   141488     +132     
  Branches     9680     9679       -1     
==========================================
- Hits        65759    65680      -79     
- Misses      73886    74095     +209     
- Partials     1711     1713       +2     
Files with missing lines Coverage Δ
...bufservices/federated-graph/checkFederatedGraph.ts 85.51% <100.00%> (+0.51%) ⬆️
...lplane/src/core/composition/composeGraphs.types.ts 100.00% <100.00%> (ø)
controlplane/src/core/composition/composition.ts 79.16% <100.00%> (-12.64%) ⬇️
...rc/core/repositories/GraphCompositionRepository.ts 97.19% <ø> (ø)
...ane/src/core/repositories/SchemaCheckRepository.ts 71.06% <100.00%> (ø)
controlplane/src/core/util.ts 80.76% <ø> (-0.61%) ⬇️
...ne/src/core/webhooks/OrganizationWebhookService.ts 58.27% <ø> (ø)
controlplane/src/core/env.schema.ts 0.00% <0.00%> (ø)
controlplane/src/core/build-server.ts 75.51% <50.00%> (-0.46%) ⬇️
controlplane/src/index.ts 0.00% <0.00%> (ø)
... and 4 more

... and 3 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 11, 2026

Router image scan passed

✅ No security vulnerabilities found in image:

ghcr.io/wundergraph/cosmo/router:sha-26e20f1d238b6230c92d008b048554e3452d71b7

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
controlplane/src/core/composition/composeGraphs.types.ts (1)

14-18: Carry downstream contract IDs through the worker payloads.

These types preserve only contractName, so the main thread has to map results back to contracts by mutable name after the worker returns. Passing the downstream federated graph ID here would remove that extra lookup and avoid coupling the handoff to a rename-sensitive identifier.

♻️ Suggested shape change
 export interface SerializedContractTagOptions {
+  contractGraphId: string;
   contractName: string;
   excludeTags: string[];
   includeTags: string[];
 }
@@
 export interface SerializedContractCompositionArtifact {
+  contractGraphId: string;
   contractName: string;
   artifact: SerializedComposedGraphArtifact;
 }

Also applies to: 44-47, 57-57

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@controlplane/src/core/composition/composeGraphs.types.ts` around lines 14 -
18, The current SerializedContractTagOptions type only carries contractName
which forces the main thread to re-map results by mutable names; update
SerializedContractTagOptions (and the other similar types referenced in this
diff) to include a stable downstream identifier field (e.g.,
downstreamFederatedGraphId or downstreamContractId) in addition to contractName
so workers emit the immutable downstream graph/contract id in payloads; modify
any places that construct or consume SerializedContractTagOptions/related types
to populate and use this new id instead of relying on contractName for final
mapping.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@controlplane/src/core/build-server.ts`:
- Around line 540-544: The shutdown currently calls worker.close() which, if it
rejects, prevents the subsequent destroyComposeGraphsPool() from running; update
the shutdown sequence around the worker.close() / BullMQ close logic so teardown
of the composition pool is unconditional—either wrap the close calls in
Promise.allSettled([...]) or put the close logic inside a try/finally and call
destroyComposeGraphsPool() in the finally block; ensure references to
worker.close() (the worker shutdown code) are guarded and
destroyComposeGraphsPool() is always invoked regardless of any rejection.

In `@controlplane/src/core/composition/composeGraphs.worker.ts`:
- Around line 237-240: The router config is being built with randomUUID() which
yields an incorrect schemaVersionId; instead pass the persisted
federatedSchemaVersionId into buildRouterExecutionConfig (replace the
randomUUID() argument with the federatedSchemaVersionId from the
task/saveComposition flow) so that buildRouterExecutionConfig (and the resulting
routerExecutionConfig) uses the same schemaVersionId persisted to the DB before
it is serialized and uploaded by
composeAndUploadRouterConfig/composeRouterConfigWithFeatureFlags; alternatively,
if you prefer minimal change, after deserializing routerExecutionConfig in
composeAndDeployGraphs() explicitly set routerExecutionConfig.schemaVersionId =
federatedSchemaVersionId before calling
saveComposition()/composeAndUploadRouterConfig().

---

Nitpick comments:
In `@controlplane/src/core/composition/composeGraphs.types.ts`:
- Around line 14-18: The current SerializedContractTagOptions type only carries
contractName which forces the main thread to re-map results by mutable names;
update SerializedContractTagOptions (and the other similar types referenced in
this diff) to include a stable downstream identifier field (e.g.,
downstreamFederatedGraphId or downstreamContractId) in addition to contractName
so workers emit the immutable downstream graph/contract id in payloads; modify
any places that construct or consume SerializedContractTagOptions/related types
to populate and use this new id instead of relying on contractName for final
mapping.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 349f055d-8d79-49e9-8efe-844cff9c93ed

📥 Commits

Reviewing files that changed from the base of the PR and between 0ea3e7d and 0fbb8b0.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (6)
  • controlplane/package.json
  • controlplane/src/core/build-server.ts
  • controlplane/src/core/composition/composeGraphs.pool.ts
  • controlplane/src/core/composition/composeGraphs.types.ts
  • controlplane/src/core/composition/composeGraphs.worker.ts
  • controlplane/src/core/repositories/FederatedGraphRepository.ts

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
controlplane/src/core/composition/composeGraphs.pool.ts (2)

25-35: Make worker resolution deterministic.

This currently picks composeGraphs.worker.ts whenever that file exists, even if the current module is the compiled .js build. If both source and build artifacts are present, the main thread and worker can end up using different module flavors. Deriving the extension from import.meta.url is safer than probing for sibling files.

Suggested fix
-import { existsSync } from 'node:fs';
 import { fileURLToPath } from 'node:url';
@@
 function getWorkerFilename() {
-  const sourceWorker = new URL('composeGraphs.worker.ts', import.meta.url);
-  if (existsSync(fileURLToPath(sourceWorker))) {
-    return {
-      filename: sourceWorker.href,
-    };
-  }
-
+  const currentPath = fileURLToPath(import.meta.url);
+  const extension = currentPath.endsWith('.ts') ? 'ts' : 'js';
   return {
-    filename: new URL('composeGraphs.worker.js', import.meta.url).href,
+    filename: new URL(`composeGraphs.worker.${extension}`, import.meta.url).href,
   };
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@controlplane/src/core/composition/composeGraphs.pool.ts` around lines 25 -
35, The getWorkerFilename function is non-deterministically choosing
composeGraphs.worker.ts when that file happens to exist; remove the filesystem
probe and instead derive the worker module extension from the current module's
import.meta.url so the main thread and worker use the same module flavor.
Replace the existsSync/fileURLToPath logic in getWorkerFilename with code that
reads the extension of import.meta.url (e.g., ".ts" or ".js") and returns a URL
for composeGraphs.worker with that same extension (using new
URL('composeGraphs.worker' + ext, import.meta.url).href), ensuring the function
always selects the worker file matching the current module type.

43-58: Block new submissions once teardown starts.

destroyComposeGraphsPool() clears the singleton before await pool.destroy() completes. A composeGraphsInWorker() call that lands during that window will create a brand-new pool and keep accepting work while shutdown is in progress. A small closing flag/promise would make teardown deterministic.

Suggested fix
 let composeGraphsPool: WorkerPool | undefined;
+let composeGraphsPoolClosing: Promise<void> | undefined;
@@
 function getComposeGraphsPool() {
+  if (composeGraphsPoolClosing) {
+    throw new Error('composeGraphs worker pool is shutting down');
+  }
+
   if (composeGraphsPool) {
     return composeGraphsPool;
   }
@@
 export async function destroyComposeGraphsPool() {
-  if (!composeGraphsPool) {
+  if (!composeGraphsPool) {
+    if (composeGraphsPoolClosing) {
+      await composeGraphsPoolClosing;
+    }
     return;
   }
 
   const pool = composeGraphsPool;
   composeGraphsPool = undefined;
-  await pool.destroy();
+  composeGraphsPoolClosing = pool.destroy().finally(() => {
+    composeGraphsPoolClosing = undefined;
+  });
+  await composeGraphsPoolClosing;
 }

Also applies to: 97-109

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@controlplane/src/core/composition/composeGraphs.pool.ts` around lines 43 -
58, The teardown races because destroyComposeGraphsPool() clears the
composeGraphsPool global before awaiting pool.destroy(), allowing
getComposeGraphsPool() (and thus composeGraphsInWorker()) to create a new pool
during shutdown; fix by adding a shutdown flag/promise (e.g.,
composeGraphsPoolShuttingDown or composeGraphsPoolDestroying) checked in
getComposeGraphsPool() to block or throw while teardown is in progress and set
that flag at the start of destroyComposeGraphsPool(), await pool.destroy(), then
clear the flag and the composeGraphsPool; update any call sites
(composeGraphsInWorker) to respect the flag/promise so new submissions are
rejected or queued until teardown completes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@controlplane/src/core/composition/composeGraphs.pool.ts`:
- Around line 69-86: The deserializer deserializeComposedGraphArtifact is
dropping fieldConfigurations by hardcoding it to [], so update the serialization
shape and rehydration: add fieldConfigurations to
SerializedComposedGraphArtifact (with the correct type used by
ComposedFederatedGraph), ensure the composer writes fieldConfigurations into the
serialized artifact, and then set fieldConfigurations:
artifact.fieldConfigurations (or map/deserialize each entry if they need
conversion) inside deserializeComposedGraphArtifact instead of []. Make sure the
type imports/usages align with the existing ComposedFederatedGraph and
ComposedSubgraph types and handle undefined by defaulting to an empty array only
if truly absent.

---

Nitpick comments:
In `@controlplane/src/core/composition/composeGraphs.pool.ts`:
- Around line 25-35: The getWorkerFilename function is non-deterministically
choosing composeGraphs.worker.ts when that file happens to exist; remove the
filesystem probe and instead derive the worker module extension from the current
module's import.meta.url so the main thread and worker use the same module
flavor. Replace the existsSync/fileURLToPath logic in getWorkerFilename with
code that reads the extension of import.meta.url (e.g., ".ts" or ".js") and
returns a URL for composeGraphs.worker with that same extension (using new
URL('composeGraphs.worker' + ext, import.meta.url).href), ensuring the function
always selects the worker file matching the current module type.
- Around line 43-58: The teardown races because destroyComposeGraphsPool()
clears the composeGraphsPool global before awaiting pool.destroy(), allowing
getComposeGraphsPool() (and thus composeGraphsInWorker()) to create a new pool
during shutdown; fix by adding a shutdown flag/promise (e.g.,
composeGraphsPoolShuttingDown or composeGraphsPoolDestroying) checked in
getComposeGraphsPool() to block or throw while teardown is in progress and set
that flag at the start of destroyComposeGraphsPool(), await pool.destroy(), then
clear the flag and the composeGraphsPool; update any call sites
(composeGraphsInWorker) to respect the flag/promise so new submissions are
rejected or queued until teardown completes.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ecf46c28-0948-4e1b-be94-6ee7dbedfe72

📥 Commits

Reviewing files that changed from the base of the PR and between 0fbb8b0 and b2158ba.

📒 Files selected for processing (1)
  • controlplane/src/core/composition/composeGraphs.pool.ts

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@controlplane/src/core/composition/composeGraphs.worker.ts`:
- Around line 67-69: Fix the missing space in the error message building in
composeGraphs.worker.ts: when constructing the message that uses
ROUTER_COMPATIBILITY_VERSIONS, ensure there's a space between "Cosmo." and
"Please" (e.g., add a trailing space to the first sentence or a leading space to
the second) so the concatenated string reads "Cosmo. Please set..."; locate the
concatenation where `Router compatibility version ${version} is not supported by
Cosmo.` is joined with `Please set one of the following valid versions:\n ` and
add the space.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c3e2b0db-1bc5-4926-88e6-19109b0e54b7

📥 Commits

Reviewing files that changed from the base of the PR and between 73d6a12 and b60334a.

📒 Files selected for processing (3)
  • controlplane/src/core/composition/composeGraphs.pool.ts
  • controlplane/src/core/composition/composeGraphs.types.ts
  • controlplane/src/core/composition/composeGraphs.worker.ts

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
controlplane/src/core/composition/composeGraphs.worker.ts (1)

268-274: Consider defensive error handling for SDL parsing.

The parse(subgraph.schemaSDL) call can throw if the SDL is malformed. While published subgraphs should have valid SDL, consider whether this worker should handle parse errors gracefully or let them propagate.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@controlplane/src/core/composition/composeGraphs.worker.ts` around lines 268 -
274, The parse(subgraph.schemaSDL) call inside toCompositionSubgraphs can throw
on malformed SDL; wrap the parse call in a try/catch inside the
toCompositionSubgraphs function (or a small helper) to handle parse errors for
each SubgraphDTO: catch the error, include identifying context (e.g.,
subgraph.name and/or subgraph.routingUrl), log or propagate a clearer Error with
that context, and either skip/omit the failing subgraph or rethrow a formatted
error depending on desired behaviour so the failure is explicit and traceable.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@controlplane/src/core/composition/composeGraphs.worker.ts`:
- Around line 268-274: The parse(subgraph.schemaSDL) call inside
toCompositionSubgraphs can throw on malformed SDL; wrap the parse call in a
try/catch inside the toCompositionSubgraphs function (or a small helper) to
handle parse errors for each SubgraphDTO: catch the error, include identifying
context (e.g., subgraph.name and/or subgraph.routingUrl), log or propagate a
clearer Error with that context, and either skip/omit the failing subgraph or
rethrow a formatted error depending on desired behaviour so the failure is
explicit and traceable.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 8e2a29fc-054c-4529-b135-9d30e8b8156d

📥 Commits

Reviewing files that changed from the base of the PR and between b60334a and ea5a0d2.

📒 Files selected for processing (2)
  • controlplane/src/core/composition/composeGraphs.worker.ts
  • controlplane/src/core/repositories/FederatedGraphRepository.ts

@StarpTech StarpTech requested a review from Aenimus March 12, 2026 14:19
@StarpTech StarpTech requested a review from Aenimus March 12, 2026 16:53
@StarpTech StarpTech requested a review from Aenimus March 12, 2026 18:45
@StarpTech StarpTech merged commit ead3683 into main Mar 16, 2026
55 checks passed
@StarpTech StarpTech deleted the dustin/eng-8238-mitigate-event-loop-block-in-cosmo-controlplane branch March 16, 2026 15:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants