-
-
Notifications
You must be signed in to change notification settings - Fork 724
Improvements to the runs replication service to prevent buffer exhaustion #2047
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
WalkthroughThis change upgrades the task runs replication system to use a new ClickHouse table and schema version ( Changes
Sequence Diagram(s)sequenceDiagram
participant Env as Environment
participant Init as runsReplicationInstance.server.ts
participant Service as RunsReplicationService
participant ClickHouse as ClickHouse DB
Env->>Init: Provides RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT
Init->>Service: Initialize RunsReplicationService({ waitForAsyncInsert })
Service->>ClickHouse: Insert TaskRunV2 (with wait_for_async_insert setting)
Service->>ClickHouse: Insert Payload (traced, sequentially after TaskRun)
Possibly related PRs
Suggested reviewers
Poem
Tip ⚡️ Faster reviews with caching
Enjoy the performance boost—your workflow just got faster. ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (2)
internal-packages/clickhouse/src/taskRuns.ts (1)
45-46
: 🛠️ Refactor suggestionStore
_version
as a numeric type to avoid implicit parsing in ClickHouse
_version
is declared as aString
, but the target column isUInt64
.
Relying on ClickHouse to coerce quoted strings to integers works, yet it prevents the client-side schema from catching accidental non-numeric values and incurs an extra parse step on every insert.- _version: z.string(), + // Accept either a bigint or its string representation. Using bigint keeps + // type-safety while still allowing `.toString()` when serialising. + _version: z + .union([z.number().int(), z.string().regex(/^\d+$/)]) + .transform((v) => Number(v)),Adopting a numeric type here will surface bugs earlier and keeps the TypeScript view aligned with the ClickHouse column definition.
apps/webapp/app/services/runsReplicationService.server.ts (1)
598-600
: 🛠️ Refactor suggestionAvoid stringification of
_version
to preserve numeric ordering
_version
is cast tostring
before insertion. When ClickHouse receives"123"
it parses it back toUInt64
, but the extra hop costs CPU and loses type guarantees in TypeScript.If you adopt a numeric schema for
_version
(see earlier comment), you can keep the value asbigint
all the way:- _version: _version.toString(), + _version,This removes an unnecessary conversion and keeps the application & database types aligned.
🧹 Nitpick comments (5)
internal-packages/clickhouse/src/taskRuns.ts (1)
54-63
: Duplicatedwait_for_async_insert
setting may hide configuration errorsThe builder already sets a default
wait_for_async_insert: 0
.
Down-stream callers (e.g.RunsReplicationService
) override the same setting viaparams.clickhouse_settings
.
Having two sources of truth makes it hard to reason about which value ultimately reaches ClickHouse.Consider removing the default here and require callers to specify the value explicitly:
- wait_for_async_insert: 0, + // let callers decide whether to block on async insertsThis keeps the behaviour predictable and limits surprises when environment variables change.
internal-packages/clickhouse/schema/004_create_task_runs_v2.sql (1)
74-77
: Add a TTL for tombstoned rows to keep the ReplacingMergeTree leanA
ReplacingMergeTree
keeps all versions of a row until they are merged out.
Because_is_deleted = 1
marks logical deletions, the table will grow indefinitely unless old versions are eventually purged.You can instruct ClickHouse to drop superseded / deleted rows after a grace period with a TTL:
ALTER TABLE trigger_dev.task_runs_v2 MODIFY TTL created_at + INTERVAL 30 DAY DELETE WHERE _is_deleted = 1;(or another retention window that matches compliance requirements).
This prevents long-running merges and unbounded disk growth.
apps/webapp/app/services/runsReplicationService.server.ts (3)
53-54
: Expose a sane default forwaitForAsyncInsert
waitForAsyncInsert
is optional but later coerced withthis.options.waitForAsyncInsert ? 1 : 0
.
If the flag is omitted, we silently fall back to synchronous behaviour (0).
Consider defaulting the option totrue
(or documenting the current implicitfalse
) to avoid ambiguity.- waitForAsyncInsert?: boolean; + waitForAsyncInsert?: boolean; // defaults to FALSE when unspecifiedAt minimum, document the default in the type comment so callers know what to expect.
440-442
: Inserts are now strictly sequential – consider re-introducing parallelism
#insertTaskRunInserts
and#insertPayloadInserts
are awaited one after the other.
Under high throughput this doubles latency and halves throughput compared to the previousPromise.all
pattern.- await this.#insertTaskRunInserts(taskRunInserts); - await this.#insertPayloadInserts(payloadInserts); + await Promise.all([ + this.#insertTaskRunInserts(taskRunInserts), + this.#insertPayloadInserts(payloadInserts), + ]);Unless there is a strong ordering requirement, running both inserts concurrently will better utilise the ClickHouse write pipeline.
458-460
: Repeated per-call settings override can mask table-level defaultsEvery insert passes
clickhouse_settings.wait_for_async_insert
.
Together with the defaults defined intaskRuns.insertTaskRuns
this creates multiple override layers.
Prefer pushing the setting into the table-specific insert builder once and omit it here to keep the insert code concise.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
apps/webapp/app/env.server.ts
(1 hunks)apps/webapp/app/services/runsReplicationInstance.server.ts
(1 hunks)apps/webapp/app/services/runsReplicationService.server.ts
(8 hunks)apps/webapp/test/runsReplicationService.test.ts
(10 hunks)internal-packages/clickhouse/schema/004_create_task_runs_v2.sql
(1 hunks)internal-packages/clickhouse/src/taskRuns.test.ts
(3 hunks)internal-packages/clickhouse/src/taskRuns.ts
(2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
internal-packages/clickhouse/src/taskRuns.ts (1)
internal-packages/clickhouse/src/client/types.ts (1)
ClickhouseWriter
(56-65)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (6)
apps/webapp/app/env.server.ts (1)
771-771
: Added new environment variable to control ClickHouse async insert behavior.The new
RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT
variable provides a toggle for the ClickHousewait_for_async_insert
setting, defaulting to "0" (disabled). This is part of the buffer exhaustion prevention strategy.internal-packages/clickhouse/src/taskRuns.test.ts (3)
6-6
: Updated test suite name to reflect the new schema version.The test suite name now correctly reflects the migration from
TaskRunV1
toTaskRunV2
.
73-73
: Table reference updated to use the new schema version.Query now correctly targets
task_runs_v2
instead of the previoustask_runs_v1
table.
229-229
: Updated FINAL query to use the new table version.Deduplication test is now correctly targeting the
task_runs_v2
table with theFINAL
modifier.apps/webapp/app/services/runsReplicationInstance.server.ts (1)
52-52
: Added configurable async insert setting to the replication service.The service now passes the new
waitForAsyncInsert
option based on the environment variable, which allows dynamic control of ClickHouse's async insert behavior. This is a key component of the buffer exhaustion prevention strategy in this PR.apps/webapp/test/runsReplicationService.test.ts (1)
93-93
: Updated all test queries to use the new task_runs_v2 table.All test queries have been correctly migrated to reference the new
task_runs_v2
table, ensuring test coverage for the updated schema implementation.Also applies to: 282-282, 607-607, 716-716, 841-841, 969-969, 1120-1120, 1239-1239, 1378-1378, 1491-1491
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) { | ||
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { | ||
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert( | ||
taskRunInserts, | ||
{ | ||
params: { | ||
clickhouse_settings: { | ||
wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0, | ||
}, | ||
}, | ||
}, | ||
} | ||
); | ||
} | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential typo – taskRuns.insert
may not exist
The clickhouse helper exports insertTaskRuns
, yet here we call this.options.clickhouse.taskRuns.insert(...)
.
If insert
is not attached to taskRuns
, this will throw at runtime.
Double-check the client initialisation. If the intended method is insertTaskRuns
, update the call:
- const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
- taskRunInserts,
+ const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertTaskRuns(
+ taskRunInserts,
Failing fast here prevents silent data loss during replication.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) { | |
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { | |
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert( | |
taskRunInserts, | |
{ | |
params: { | |
clickhouse_settings: { | |
wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0, | |
}, | |
}, | |
}, | |
} | |
); | |
} | |
); | |
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) { | |
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { | |
- const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert( | |
- taskRunInserts, | |
+ const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertTaskRuns( | |
+ taskRunInserts, | |
{ | |
params: { | |
clickhouse_settings: { | |
wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0, | |
}, | |
}, | |
} | |
); |
Summary by CodeRabbit
New Features
Improvements
Bug Fixes