Skip to content

Improvements to the runs replication service to prevent buffer exhaustion #2047

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@ const EnvironmentSchema = z.object({
RUN_REPLICATION_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT: z.coerce.number().int().default(240),
RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/services/runsReplicationInstance.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ function initializeRunsReplicationInstance() {
leaderLockRetryIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS,
ackIntervalSeconds: env.RUN_REPLICATION_ACK_INTERVAL_SECONDS,
logLevel: env.RUN_REPLICATION_LOG_LEVEL,
waitForAsyncInsert: env.RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT === "1",
tracer: provider.getTracer("runs-replication-service"),
});

Expand Down
90 changes: 48 additions & 42 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV1 } from "@internal/clickhouse";
import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse";
import { RedisOptions } from "@internal/redis";
import {
LogicalReplicationClient,
Expand All @@ -7,7 +7,7 @@ import {
type MessageUpdate,
type PgoutputMessage,
} from "@internal/replication";
import { startSpan, trace, type Tracer } from "@internal/tracing";
import { recordSpanError, startSpan, trace, type Tracer } from "@internal/tracing";
import { Logger, LogLevel } from "@trigger.dev/core/logger";
import { tryCatch } from "@trigger.dev/core/utils";
import { parsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
Expand Down Expand Up @@ -50,6 +50,7 @@ export type RunsReplicationServiceOptions = {
logger?: Logger;
logLevel?: LogLevel;
tracer?: Tracer;
waitForAsyncInsert?: boolean;
};

type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" };
Expand Down Expand Up @@ -107,6 +108,7 @@ export class RunsReplicationService {
ackIntervalSeconds: options.ackIntervalSeconds ?? 10,
leaderLockRetryCount: options.leaderLockRetryCount ?? 240,
leaderLockRetryIntervalMs: options.leaderLockRetryIntervalMs ?? 500,
tracer: options.tracer,
});

this._concurrentFlushScheduler = new ConcurrentFlushScheduler<TaskRunInsert>({
Expand All @@ -115,6 +117,7 @@ export class RunsReplicationService {
maxConcurrency: options.maxFlushConcurrency ?? 100,
callback: this.#flushBatch.bind(this),
logger: new Logger("ConcurrentFlushScheduler", options.logLevel ?? "info"),
tracer: options.tracer,
});

this._replicationClient.events.on("data", async ({ lsn, log, parseDuration }) => {
Expand Down Expand Up @@ -404,9 +407,6 @@ export class RunsReplicationService {

async #flushBatch(flushId: string, batch: Array<TaskRunInsert>) {
if (batch.length === 0) {
this.logger.debug("No runs to flush", {
flushId,
});
return;
}

Expand Down Expand Up @@ -437,10 +437,8 @@ export class RunsReplicationService {
payloadInserts: payloadInserts.length,
});

await Promise.all([
this.#insertTaskRunInserts(taskRunInserts),
this.#insertPayloadInserts(payloadInserts),
]);
await this.#insertTaskRunInserts(taskRunInserts);
await this.#insertPayloadInserts(payloadInserts);

this.logger.debug("Flushed inserts", {
flushId,
Expand All @@ -450,51 +448,59 @@ export class RunsReplicationService {
});
}

async #insertTaskRunInserts(taskRunInserts: TaskRunV1[]) {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
taskRunInserts,
{
params: {
clickhouse_settings: {
wait_for_async_insert: 1,
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) {
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
taskRunInserts,
{
params: {
clickhouse_settings: {
wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0,
},
},
},
}
);
}
);
Comment on lines +451 to +462
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential typo – taskRuns.insert may not exist

The clickhouse helper exports insertTaskRuns, yet here we call this.options.clickhouse.taskRuns.insert(...).
If insert is not attached to taskRuns, this will throw at runtime.

Double-check the client initialisation. If the intended method is insertTaskRuns, update the call:

-      const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
-        taskRunInserts,
+      const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertTaskRuns(
+        taskRunInserts,

Failing fast here prevents silent data loss during replication.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) {
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
taskRunInserts,
{
params: {
clickhouse_settings: {
wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0,
},
},
},
}
);
}
);
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) {
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
- const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
- taskRunInserts,
+ const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertTaskRuns(
+ taskRunInserts,
{
params: {
clickhouse_settings: {
wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0,
},
},
}
);


if (insertError) {
this.logger.error("Error inserting task run inserts", {
error: insertError,
});
}
if (insertError) {
this.logger.error("Error inserting task run inserts", {
error: insertError,
});

recordSpanError(span, insertError);
}

return insertResult;
return insertResult;
});
}

async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[]) {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads(
payloadInserts,
{
params: {
clickhouse_settings: {
wait_for_async_insert: 1,
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads(
payloadInserts,
{
params: {
clickhouse_settings: {
wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0,
},
},
},
}
);
}
);

if (insertError) {
this.logger.error("Error inserting payload inserts", {
error: insertError,
});
}
if (insertError) {
this.logger.error("Error inserting payload inserts", {
error: insertError,
});

return insertResult;
recordSpanError(span, insertError);
}

return insertResult;
});
}

async #prepareRunInserts(
batchedRun: TaskRunInsert
): Promise<{ taskRunInsert?: TaskRunV1; payloadInsert?: RawTaskRunPayloadV1 }> {
): Promise<{ taskRunInsert?: TaskRunV2; payloadInsert?: RawTaskRunPayloadV1 }> {
this.logger.debug("Preparing run", {
batchedRun,
});
Expand Down Expand Up @@ -547,7 +553,7 @@ export class RunsReplicationService {
environmentType: string,
event: "insert" | "update" | "delete",
_version: bigint
): Promise<TaskRunV1> {
): Promise<TaskRunV2> {
const output = await this.#prepareJson(run.output, run.outputType);

return {
Expand Down
20 changes: 10 additions & 10 deletions apps/webapp/test/runsReplicationService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ describe("RunsReplicationService", () => {
// Check that the row was replicated to clickhouse
const queryRuns = clickhouse.reader.query({
name: "runs-replication",
query: "SELECT * FROM trigger_dev.task_runs_v1",
query: "SELECT * FROM trigger_dev.task_runs_v2",
schema: z.any(),
});

Expand Down Expand Up @@ -279,7 +279,7 @@ describe("RunsReplicationService", () => {
// Query ClickHouse for the replicated run
const queryRuns = clickhouse.reader.query({
name: "runs-replication-batching",
query: "SELECT * FROM trigger_dev.task_runs_v1 WHERE run_id = {run_id:String}",
query: "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {run_id:String}",
schema: z.any(),
params: z.object({ run_id: z.string() }),
});
Expand Down Expand Up @@ -604,7 +604,7 @@ describe("RunsReplicationService", () => {
// Query ClickHouse for the replicated run
const queryRuns = clickhouse.reader.query({
name: "runs-replication-update",
query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL WHERE run_id = {run_id:String}",
query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}",
schema: z.any(),
params: z.object({ run_id: z.string() }),
});
Expand Down Expand Up @@ -713,7 +713,7 @@ describe("RunsReplicationService", () => {
// Query ClickHouse for the replicated run using FINAL
const queryRuns = clickhouse.reader.query({
name: "runs-replication-delete",
query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL WHERE run_id = {run_id:String}",
query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}",
schema: z.any(),
params: z.object({ run_id: z.string() }),
});
Expand Down Expand Up @@ -838,7 +838,7 @@ describe("RunsReplicationService", () => {
// Query ClickHouse for both runs using FINAL
const queryRuns = clickhouse.reader.query({
name: "runs-replication-shutdown-handover",
query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL ORDER BY created_at ASC",
query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL ORDER BY created_at ASC",
schema: z.any(),
});

Expand Down Expand Up @@ -966,7 +966,7 @@ describe("RunsReplicationService", () => {
// Query ClickHouse for the run using FINAL
const queryRuns = clickhouse.reader.query({
name: "runs-replication-shutdown-after-processed",
query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL WHERE run_id = {run_id:String}",
query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}",
schema: z.any(),
params: z.object({ run_id: z.string() }),
});
Expand Down Expand Up @@ -1117,7 +1117,7 @@ describe("RunsReplicationService", () => {
// Query ClickHouse for all runs using FINAL
const queryRuns = clickhouse.reader.query({
name: "runs-replication-stress-bulk-insert",
query: `SELECT run_id, friendly_id, trace_id, task_identifier FROM trigger_dev.task_runs_v1 FINAL`,
query: `SELECT run_id, friendly_id, trace_id, task_identifier FROM trigger_dev.task_runs_v2 FINAL`,
schema: z.any(),
});

Expand Down Expand Up @@ -1236,7 +1236,7 @@ describe("RunsReplicationService", () => {
// Query ClickHouse for all runs using FINAL
const queryRuns = clickhouse.reader.query({
name: "runs-replication-stress-bulk-insert",
query: `SELECT * FROM trigger_dev.task_runs_v1 FINAL`,
query: `SELECT * FROM trigger_dev.task_runs_v2 FINAL`,
schema: z.any(),
});

Expand Down Expand Up @@ -1375,7 +1375,7 @@ describe("RunsReplicationService", () => {
// Query ClickHouse for both runs using FINAL
const queryRuns = clickhouse.reader.query({
name: "runs-replication-multi-event-tx",
query: `SELECT * FROM trigger_dev.task_runs_v1 FINAL WHERE run_id IN ({run_id_1:String}, {run_id_2:String})`,
query: `SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id IN ({run_id_1:String}, {run_id_2:String})`,
schema: z.any(),
params: z.object({ run_id_1: z.string(), run_id_2: z.string() }),
});
Expand Down Expand Up @@ -1488,7 +1488,7 @@ describe("RunsReplicationService", () => {
// Query ClickHouse for all runs using FINAL
const queryRuns = clickhouse.reader.query({
name: "runs-replication-long-tx",
query: `SELECT * FROM trigger_dev.task_runs_v1 FINAL`,
query: `SELECT * FROM trigger_dev.task_runs_v2 FINAL`,
schema: z.any(),
});

Expand Down
94 changes: 94 additions & 0 deletions internal-packages/clickhouse/schema/004_create_task_runs_v2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
-- +goose Up

/*
This is the second version of the task runs table.
The main change is we've added organization_id and project_id to the sort key, and removed the toDate(created_at) and task_identifier columns from the sort key.
We will add a skip index for the task_identifier column in a future migration.
*/
CREATE TABLE trigger_dev.task_runs_v2
(
/* ─── ids & hierarchy ─────────────────────────────────────── */
environment_id String,
organization_id String,
project_id String,
run_id String,

environment_type LowCardinality(String),
friendly_id String,
attempt UInt8 DEFAULT 1,

/* ─── enums / status ──────────────────────────────────────── */
engine LowCardinality(String),
status LowCardinality(String),

/* ─── queue / concurrency / schedule ─────────────────────── */
task_identifier String,
queue String,

schedule_id String,
batch_id String,

/* ─── related runs ─────────────────────────────────────────────── */
root_run_id String,
parent_run_id String,
depth UInt8 DEFAULT 0,

/* ─── telemetry ─────────────────────────────────────────────── */
span_id String,
trace_id String,
idempotency_key String,

/* ─── timing ─────────────────────────────────────────────── */
created_at DateTime64(3),
updated_at DateTime64(3),
started_at Nullable(DateTime64(3)),
executed_at Nullable(DateTime64(3)),
completed_at Nullable(DateTime64(3)),
delay_until Nullable(DateTime64(3)),
queued_at Nullable(DateTime64(3)),
expired_at Nullable(DateTime64(3)),
expiration_ttl String,

/* ─── cost / usage ───────────────────────────────────────── */
usage_duration_ms UInt32 DEFAULT 0,
cost_in_cents Float64 DEFAULT 0,
base_cost_in_cents Float64 DEFAULT 0,

/* ─── payload & context ──────────────────────────────────── */
output JSON(max_dynamic_paths = 1024),
error JSON(max_dynamic_paths = 64),

/* ─── tagging / versions ─────────────────────────────────── */
tags Array(String) CODEC(ZSTD(1)),
task_version String CODEC(LZ4),
sdk_version String CODEC(LZ4),
cli_version String CODEC(LZ4),
machine_preset LowCardinality(String) CODEC(LZ4),

is_test UInt8 DEFAULT 0,

/* ─── commit lsn ─────────────────────────────────────────────── */
_version UInt64,
_is_deleted UInt8 DEFAULT 0
)
ENGINE = ReplacingMergeTree(_version, _is_deleted)
PARTITION BY toYYYYMM(created_at)
ORDER BY (organization_id, project_id, environment_id, created_at, run_id)
SETTINGS enable_json_type = 1;

/* Fast tag filtering */
ALTER TABLE trigger_dev.task_runs_v2
ADD INDEX idx_tags tags TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;

CREATE VIEW trigger_dev.tmp_eric_task_runs_full_v2 AS
SELECT
s.*,
p.payload as payload
FROM trigger_dev.task_runs_v2 AS s FINAL
LEFT JOIN trigger_dev.raw_task_runs_payload_v1 AS p ON s.run_id = p.run_id
SETTINGS enable_json_type = 1;


-- +goose Down
DROP TABLE IF EXISTS trigger_dev.task_runs_v2;
DROP VIEW IF EXISTS trigger_dev.tmp_eric_task_runs_full_v2
6 changes: 3 additions & 3 deletions internal-packages/clickhouse/src/taskRuns.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { z } from "zod";
import { ClickhouseClient } from "./client/client.js";
import { insertRawTaskRunPayloads, insertTaskRuns } from "./taskRuns.js";

describe("Task Runs V1", () => {
describe("Task Runs V2", () => {
clickhouseTest("should be able to insert task runs", async ({ clickhouseContainer }) => {
const client = new ClickhouseClient({
name: "test",
Expand Down Expand Up @@ -70,7 +70,7 @@ describe("Task Runs V1", () => {

const query = client.query({
name: "query-task-runs",
query: "SELECT * FROM trigger_dev.task_runs_v1",
query: "SELECT * FROM trigger_dev.task_runs_v2",
schema: z.object({
environment_id: z.string(),
run_id: z.string(),
Expand Down Expand Up @@ -226,7 +226,7 @@ describe("Task Runs V1", () => {

const query = client.query({
name: "query-task-runs",
query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL",
query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL",
schema: z.object({
environment_id: z.string(),
run_id: z.string(),
Expand Down
Loading