Skip to content

Improvements to the runs replication service to prevent buffer exhaustion #2047

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 14, 2025

Conversation

ericallam
Copy link
Member

@ericallam ericallam commented May 14, 2025

Summary by CodeRabbit

  • New Features

    • Introduced a new, more detailed task runs table for improved tracking and querying of run data.
    • Added configuration for asynchronous insert behavior in task run replication.
  • Improvements

    • Upgraded task run data handling to use the latest schema version.
    • Enhanced observability with tracing for data insertion operations.
  • Bug Fixes

    • Updated tests and queries to align with the new task runs table version.

Copy link

changeset-bot bot commented May 14, 2025

⚠️ No Changeset found

Latest commit: 71aa4a0

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented May 14, 2025

Walkthrough

This change upgrades the task runs replication system to use a new ClickHouse table and schema version (task_runs_v2/TaskRunV2), adds tracing instrumentation, and introduces a configurable async insert setting via environment variable. Test queries and internal logic are updated to target the new schema and table.

Changes

Files/Paths Change Summary
apps/webapp/app/env.server.ts Added RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT to the environment schema with a default of "0".
apps/webapp/app/services/runsReplicationInstance.server.ts Passes waitForAsyncInsert option to RunsReplicationService based on the new environment variable.
apps/webapp/app/services/runsReplicationService.server.ts Upgrades logic to use TaskRunV2 instead of TaskRunV1, adds tracing spans to insertions, makes async insert configurable, and changes batch flush to sequential inserts. Updates method signatures and options accordingly.
apps/webapp/test/runsReplicationService.test.ts All test queries updated from task_runs_v1 to task_runs_v2. No logic changes.
internal-packages/clickhouse/schema/004_create_task_runs_v2.sql Adds migration for new task_runs_v2 table and associated view, including schema, indices, and down migration.
internal-packages/clickhouse/src/taskRuns.test.ts Test suite and queries updated to use task_runs_v2 instead of task_runs_v1.
internal-packages/clickhouse/src/taskRuns.ts Renames schema/type from TaskRunV1 to TaskRunV2, updates insert function to use new table and schema.

Sequence Diagram(s)

sequenceDiagram
    participant Env as Environment
    participant Init as runsReplicationInstance.server.ts
    participant Service as RunsReplicationService
    participant ClickHouse as ClickHouse DB

    Env->>Init: Provides RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT
    Init->>Service: Initialize RunsReplicationService({ waitForAsyncInsert })
    Service->>ClickHouse: Insert TaskRunV2 (with wait_for_async_insert setting)
    Service->>ClickHouse: Insert Payload (traced, sequentially after TaskRun)
Loading

Possibly related PRs

  • triggerdotdev/trigger.dev#2045: Both PRs modify the initialization of RunsReplicationService to add new configuration options, indicating related changes in service setup and instantiation.

Suggested reviewers

  • matt-aitken

Poem

In the garden of code, a new table grows,
V2 seeds planted where V1 once rose.
With tracing and toggles, async at play,
The data hops swiftly, in a bright, modern way.
🐇 Cheers to fresh carrots in ClickHouse rows!

Tip

⚡️ Faster reviews with caching
  • CodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure Review - Disable Cache at either the organization or repository level. If you prefer to disable all data retention across your organization, simply turn off the Data Retention setting under your Organization Settings.

Enjoy the performance boost—your workflow just got faster.

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (2)
internal-packages/clickhouse/src/taskRuns.ts (1)

45-46: 🛠️ Refactor suggestion

Store _version as a numeric type to avoid implicit parsing in ClickHouse

_version is declared as a String, but the target column is UInt64.
Relying on ClickHouse to coerce quoted strings to integers works, yet it prevents the client-side schema from catching accidental non-numeric values and incurs an extra parse step on every insert.

-  _version: z.string(),
+  // Accept either a bigint or its string representation.  Using bigint keeps
+  // type-safety while still allowing `.toString()` when serialising.
+  _version: z
+    .union([z.number().int(), z.string().regex(/^\d+$/)])
+    .transform((v) => Number(v)),

Adopting a numeric type here will surface bugs earlier and keeps the TypeScript view aligned with the ClickHouse column definition.

apps/webapp/app/services/runsReplicationService.server.ts (1)

598-600: 🛠️ Refactor suggestion

Avoid stringification of _version to preserve numeric ordering

_version is cast to string before insertion. When ClickHouse receives "123" it parses it back to UInt64, but the extra hop costs CPU and loses type guarantees in TypeScript.

If you adopt a numeric schema for _version (see earlier comment), you can keep the value as bigint all the way:

-      _version: _version.toString(),
+      _version,

This removes an unnecessary conversion and keeps the application & database types aligned.

🧹 Nitpick comments (5)
internal-packages/clickhouse/src/taskRuns.ts (1)

54-63: Duplicated wait_for_async_insert setting may hide configuration errors

The builder already sets a default wait_for_async_insert: 0.
Down-stream callers (e.g. RunsReplicationService) override the same setting via params.clickhouse_settings.
Having two sources of truth makes it hard to reason about which value ultimately reaches ClickHouse.

Consider removing the default here and require callers to specify the value explicitly:

-      wait_for_async_insert: 0,
+      // let callers decide whether to block on async inserts

This keeps the behaviour predictable and limits surprises when environment variables change.

internal-packages/clickhouse/schema/004_create_task_runs_v2.sql (1)

74-77: Add a TTL for tombstoned rows to keep the ReplacingMergeTree lean

A ReplacingMergeTree keeps all versions of a row until they are merged out.
Because _is_deleted = 1 marks logical deletions, the table will grow indefinitely unless old versions are eventually purged.

You can instruct ClickHouse to drop superseded / deleted rows after a grace period with a TTL:

ALTER TABLE trigger_dev.task_runs_v2
  MODIFY TTL created_at + INTERVAL 30 DAY DELETE
  WHERE _is_deleted = 1;

(or another retention window that matches compliance requirements).

This prevents long-running merges and unbounded disk growth.

apps/webapp/app/services/runsReplicationService.server.ts (3)

53-54: Expose a sane default for waitForAsyncInsert

waitForAsyncInsert is optional but later coerced with this.options.waitForAsyncInsert ? 1 : 0.
If the flag is omitted, we silently fall back to synchronous behaviour (0).
Consider defaulting the option to true (or documenting the current implicit false) to avoid ambiguity.

-  waitForAsyncInsert?: boolean;
+  waitForAsyncInsert?: boolean; // defaults to FALSE when unspecified

At minimum, document the default in the type comment so callers know what to expect.


440-442: Inserts are now strictly sequential – consider re-introducing parallelism

#insertTaskRunInserts and #insertPayloadInserts are awaited one after the other.
Under high throughput this doubles latency and halves throughput compared to the previous Promise.all pattern.

-      await this.#insertTaskRunInserts(taskRunInserts);
-      await this.#insertPayloadInserts(payloadInserts);
+      await Promise.all([
+        this.#insertTaskRunInserts(taskRunInserts),
+        this.#insertPayloadInserts(payloadInserts),
+      ]);

Unless there is a strong ordering requirement, running both inserts concurrently will better utilise the ClickHouse write pipeline.


458-460: Repeated per-call settings override can mask table-level defaults

Every insert passes clickhouse_settings.wait_for_async_insert.
Together with the defaults defined in taskRuns.insertTaskRuns this creates multiple override layers.
Prefer pushing the setting into the table-specific insert builder once and omit it here to keep the insert code concise.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fa5360a and 71aa4a0.

📒 Files selected for processing (7)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/services/runsReplicationInstance.server.ts (1 hunks)
  • apps/webapp/app/services/runsReplicationService.server.ts (8 hunks)
  • apps/webapp/test/runsReplicationService.test.ts (10 hunks)
  • internal-packages/clickhouse/schema/004_create_task_runs_v2.sql (1 hunks)
  • internal-packages/clickhouse/src/taskRuns.test.ts (3 hunks)
  • internal-packages/clickhouse/src/taskRuns.ts (2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
internal-packages/clickhouse/src/taskRuns.ts (1)
internal-packages/clickhouse/src/client/types.ts (1)
  • ClickhouseWriter (56-65)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (6)
apps/webapp/app/env.server.ts (1)

771-771: Added new environment variable to control ClickHouse async insert behavior.

The new RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT variable provides a toggle for the ClickHouse wait_for_async_insert setting, defaulting to "0" (disabled). This is part of the buffer exhaustion prevention strategy.

internal-packages/clickhouse/src/taskRuns.test.ts (3)

6-6: Updated test suite name to reflect the new schema version.

The test suite name now correctly reflects the migration from TaskRunV1 to TaskRunV2.


73-73: Table reference updated to use the new schema version.

Query now correctly targets task_runs_v2 instead of the previous task_runs_v1 table.


229-229: Updated FINAL query to use the new table version.

Deduplication test is now correctly targeting the task_runs_v2 table with the FINAL modifier.

apps/webapp/app/services/runsReplicationInstance.server.ts (1)

52-52: Added configurable async insert setting to the replication service.

The service now passes the new waitForAsyncInsert option based on the environment variable, which allows dynamic control of ClickHouse's async insert behavior. This is a key component of the buffer exhaustion prevention strategy in this PR.

apps/webapp/test/runsReplicationService.test.ts (1)

93-93: Updated all test queries to use the new task_runs_v2 table.

All test queries have been correctly migrated to reference the new task_runs_v2 table, ensuring test coverage for the updated schema implementation.

Also applies to: 282-282, 607-607, 716-716, 841-841, 969-969, 1120-1120, 1239-1239, 1378-1378, 1491-1491

Comment on lines +451 to +462
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) {
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
taskRunInserts,
{
params: {
clickhouse_settings: {
wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0,
},
},
},
}
);
}
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential typo – taskRuns.insert may not exist

The clickhouse helper exports insertTaskRuns, yet here we call this.options.clickhouse.taskRuns.insert(...).
If insert is not attached to taskRuns, this will throw at runtime.

Double-check the client initialisation. If the intended method is insertTaskRuns, update the call:

-      const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
-        taskRunInserts,
+      const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertTaskRuns(
+        taskRunInserts,

Failing fast here prevents silent data loss during replication.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) {
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
taskRunInserts,
{
params: {
clickhouse_settings: {
wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0,
},
},
},
}
);
}
);
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) {
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
- const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
- taskRunInserts,
+ const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertTaskRuns(
+ taskRunInserts,
{
params: {
clickhouse_settings: {
wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0,
},
},
}
);

@ericallam ericallam merged commit 2deef00 into main May 14, 2025
12 checks passed
@ericallam ericallam deleted the runs-replication-perf-improvements branch May 14, 2025 10:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants