Skip to content

Commit 71aa4a0

Browse files
committed
Improvements to the runs replication service to prevent buffer exhaustion
1 parent 7ace1c3 commit 71aa4a0

File tree

7 files changed

+161
-59
lines changed

7 files changed

+161
-59
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,7 @@ const EnvironmentSchema = z.object({
768768
RUN_REPLICATION_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
769769
RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT: z.coerce.number().int().default(240),
770770
RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
771+
RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
771772
});
772773

773774
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/services/runsReplicationInstance.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ function initializeRunsReplicationInstance() {
4949
leaderLockRetryIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS,
5050
ackIntervalSeconds: env.RUN_REPLICATION_ACK_INTERVAL_SECONDS,
5151
logLevel: env.RUN_REPLICATION_LOG_LEVEL,
52+
waitForAsyncInsert: env.RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT === "1",
5253
tracer: provider.getTracer("runs-replication-service"),
5354
});
5455

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 48 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV1 } from "@internal/clickhouse";
1+
import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse";
22
import { RedisOptions } from "@internal/redis";
33
import {
44
LogicalReplicationClient,
@@ -7,7 +7,7 @@ import {
77
type MessageUpdate,
88
type PgoutputMessage,
99
} from "@internal/replication";
10-
import { startSpan, trace, type Tracer } from "@internal/tracing";
10+
import { recordSpanError, startSpan, trace, type Tracer } from "@internal/tracing";
1111
import { Logger, LogLevel } from "@trigger.dev/core/logger";
1212
import { tryCatch } from "@trigger.dev/core/utils";
1313
import { parsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
@@ -50,6 +50,7 @@ export type RunsReplicationServiceOptions = {
5050
logger?: Logger;
5151
logLevel?: LogLevel;
5252
tracer?: Tracer;
53+
waitForAsyncInsert?: boolean;
5354
};
5455

5556
type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" };
@@ -107,6 +108,7 @@ export class RunsReplicationService {
107108
ackIntervalSeconds: options.ackIntervalSeconds ?? 10,
108109
leaderLockRetryCount: options.leaderLockRetryCount ?? 240,
109110
leaderLockRetryIntervalMs: options.leaderLockRetryIntervalMs ?? 500,
111+
tracer: options.tracer,
110112
});
111113

112114
this._concurrentFlushScheduler = new ConcurrentFlushScheduler<TaskRunInsert>({
@@ -115,6 +117,7 @@ export class RunsReplicationService {
115117
maxConcurrency: options.maxFlushConcurrency ?? 100,
116118
callback: this.#flushBatch.bind(this),
117119
logger: new Logger("ConcurrentFlushScheduler", options.logLevel ?? "info"),
120+
tracer: options.tracer,
118121
});
119122

120123
this._replicationClient.events.on("data", async ({ lsn, log, parseDuration }) => {
@@ -404,9 +407,6 @@ export class RunsReplicationService {
404407

405408
async #flushBatch(flushId: string, batch: Array<TaskRunInsert>) {
406409
if (batch.length === 0) {
407-
this.logger.debug("No runs to flush", {
408-
flushId,
409-
});
410410
return;
411411
}
412412

@@ -437,10 +437,8 @@ export class RunsReplicationService {
437437
payloadInserts: payloadInserts.length,
438438
});
439439

440-
await Promise.all([
441-
this.#insertTaskRunInserts(taskRunInserts),
442-
this.#insertPayloadInserts(payloadInserts),
443-
]);
440+
await this.#insertTaskRunInserts(taskRunInserts);
441+
await this.#insertPayloadInserts(payloadInserts);
444442

445443
this.logger.debug("Flushed inserts", {
446444
flushId,
@@ -450,51 +448,59 @@ export class RunsReplicationService {
450448
});
451449
}
452450

453-
async #insertTaskRunInserts(taskRunInserts: TaskRunV1[]) {
454-
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
455-
taskRunInserts,
456-
{
457-
params: {
458-
clickhouse_settings: {
459-
wait_for_async_insert: 1,
451+
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) {
452+
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
453+
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
454+
taskRunInserts,
455+
{
456+
params: {
457+
clickhouse_settings: {
458+
wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0,
459+
},
460460
},
461-
},
462-
}
463-
);
461+
}
462+
);
464463

465-
if (insertError) {
466-
this.logger.error("Error inserting task run inserts", {
467-
error: insertError,
468-
});
469-
}
464+
if (insertError) {
465+
this.logger.error("Error inserting task run inserts", {
466+
error: insertError,
467+
});
468+
469+
recordSpanError(span, insertError);
470+
}
470471

471-
return insertResult;
472+
return insertResult;
473+
});
472474
}
473475

474476
async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[]) {
475-
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads(
476-
payloadInserts,
477-
{
478-
params: {
479-
clickhouse_settings: {
480-
wait_for_async_insert: 1,
477+
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
478+
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads(
479+
payloadInserts,
480+
{
481+
params: {
482+
clickhouse_settings: {
483+
wait_for_async_insert: this.options.waitForAsyncInsert ? 1 : 0,
484+
},
481485
},
482-
},
483-
}
484-
);
486+
}
487+
);
485488

486-
if (insertError) {
487-
this.logger.error("Error inserting payload inserts", {
488-
error: insertError,
489-
});
490-
}
489+
if (insertError) {
490+
this.logger.error("Error inserting payload inserts", {
491+
error: insertError,
492+
});
491493

492-
return insertResult;
494+
recordSpanError(span, insertError);
495+
}
496+
497+
return insertResult;
498+
});
493499
}
494500

495501
async #prepareRunInserts(
496502
batchedRun: TaskRunInsert
497-
): Promise<{ taskRunInsert?: TaskRunV1; payloadInsert?: RawTaskRunPayloadV1 }> {
503+
): Promise<{ taskRunInsert?: TaskRunV2; payloadInsert?: RawTaskRunPayloadV1 }> {
498504
this.logger.debug("Preparing run", {
499505
batchedRun,
500506
});
@@ -547,7 +553,7 @@ export class RunsReplicationService {
547553
environmentType: string,
548554
event: "insert" | "update" | "delete",
549555
_version: bigint
550-
): Promise<TaskRunV1> {
556+
): Promise<TaskRunV2> {
551557
const output = await this.#prepareJson(run.output, run.outputType);
552558

553559
return {

apps/webapp/test/runsReplicationService.test.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ describe("RunsReplicationService", () => {
9090
// Check that the row was replicated to clickhouse
9191
const queryRuns = clickhouse.reader.query({
9292
name: "runs-replication",
93-
query: "SELECT * FROM trigger_dev.task_runs_v1",
93+
query: "SELECT * FROM trigger_dev.task_runs_v2",
9494
schema: z.any(),
9595
});
9696

@@ -279,7 +279,7 @@ describe("RunsReplicationService", () => {
279279
// Query ClickHouse for the replicated run
280280
const queryRuns = clickhouse.reader.query({
281281
name: "runs-replication-batching",
282-
query: "SELECT * FROM trigger_dev.task_runs_v1 WHERE run_id = {run_id:String}",
282+
query: "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {run_id:String}",
283283
schema: z.any(),
284284
params: z.object({ run_id: z.string() }),
285285
});
@@ -604,7 +604,7 @@ describe("RunsReplicationService", () => {
604604
// Query ClickHouse for the replicated run
605605
const queryRuns = clickhouse.reader.query({
606606
name: "runs-replication-update",
607-
query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL WHERE run_id = {run_id:String}",
607+
query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}",
608608
schema: z.any(),
609609
params: z.object({ run_id: z.string() }),
610610
});
@@ -713,7 +713,7 @@ describe("RunsReplicationService", () => {
713713
// Query ClickHouse for the replicated run using FINAL
714714
const queryRuns = clickhouse.reader.query({
715715
name: "runs-replication-delete",
716-
query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL WHERE run_id = {run_id:String}",
716+
query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}",
717717
schema: z.any(),
718718
params: z.object({ run_id: z.string() }),
719719
});
@@ -838,7 +838,7 @@ describe("RunsReplicationService", () => {
838838
// Query ClickHouse for both runs using FINAL
839839
const queryRuns = clickhouse.reader.query({
840840
name: "runs-replication-shutdown-handover",
841-
query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL ORDER BY created_at ASC",
841+
query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL ORDER BY created_at ASC",
842842
schema: z.any(),
843843
});
844844

@@ -966,7 +966,7 @@ describe("RunsReplicationService", () => {
966966
// Query ClickHouse for the run using FINAL
967967
const queryRuns = clickhouse.reader.query({
968968
name: "runs-replication-shutdown-after-processed",
969-
query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL WHERE run_id = {run_id:String}",
969+
query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}",
970970
schema: z.any(),
971971
params: z.object({ run_id: z.string() }),
972972
});
@@ -1117,7 +1117,7 @@ describe("RunsReplicationService", () => {
11171117
// Query ClickHouse for all runs using FINAL
11181118
const queryRuns = clickhouse.reader.query({
11191119
name: "runs-replication-stress-bulk-insert",
1120-
query: `SELECT run_id, friendly_id, trace_id, task_identifier FROM trigger_dev.task_runs_v1 FINAL`,
1120+
query: `SELECT run_id, friendly_id, trace_id, task_identifier FROM trigger_dev.task_runs_v2 FINAL`,
11211121
schema: z.any(),
11221122
});
11231123

@@ -1236,7 +1236,7 @@ describe("RunsReplicationService", () => {
12361236
// Query ClickHouse for all runs using FINAL
12371237
const queryRuns = clickhouse.reader.query({
12381238
name: "runs-replication-stress-bulk-insert",
1239-
query: `SELECT * FROM trigger_dev.task_runs_v1 FINAL`,
1239+
query: `SELECT * FROM trigger_dev.task_runs_v2 FINAL`,
12401240
schema: z.any(),
12411241
});
12421242

@@ -1375,7 +1375,7 @@ describe("RunsReplicationService", () => {
13751375
// Query ClickHouse for both runs using FINAL
13761376
const queryRuns = clickhouse.reader.query({
13771377
name: "runs-replication-multi-event-tx",
1378-
query: `SELECT * FROM trigger_dev.task_runs_v1 FINAL WHERE run_id IN ({run_id_1:String}, {run_id_2:String})`,
1378+
query: `SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id IN ({run_id_1:String}, {run_id_2:String})`,
13791379
schema: z.any(),
13801380
params: z.object({ run_id_1: z.string(), run_id_2: z.string() }),
13811381
});
@@ -1488,7 +1488,7 @@ describe("RunsReplicationService", () => {
14881488
// Query ClickHouse for all runs using FINAL
14891489
const queryRuns = clickhouse.reader.query({
14901490
name: "runs-replication-long-tx",
1491-
query: `SELECT * FROM trigger_dev.task_runs_v1 FINAL`,
1491+
query: `SELECT * FROM trigger_dev.task_runs_v2 FINAL`,
14921492
schema: z.any(),
14931493
});
14941494

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
-- +goose Up
2+
3+
/*
4+
This is the second version of the task runs table.
5+
The main change is we've added organization_id and project_id to the sort key, and removed the toDate(created_at) and task_identifier columns from the sort key.
6+
We will add a skip index for the task_identifier column in a future migration.
7+
*/
8+
CREATE TABLE trigger_dev.task_runs_v2
9+
(
10+
/* ─── ids & hierarchy ─────────────────────────────────────── */
11+
environment_id String,
12+
organization_id String,
13+
project_id String,
14+
run_id String,
15+
16+
environment_type LowCardinality(String),
17+
friendly_id String,
18+
attempt UInt8 DEFAULT 1,
19+
20+
/* ─── enums / status ──────────────────────────────────────── */
21+
engine LowCardinality(String),
22+
status LowCardinality(String),
23+
24+
/* ─── queue / concurrency / schedule ─────────────────────── */
25+
task_identifier String,
26+
queue String,
27+
28+
schedule_id String,
29+
batch_id String,
30+
31+
/* ─── related runs ─────────────────────────────────────────────── */
32+
root_run_id String,
33+
parent_run_id String,
34+
depth UInt8 DEFAULT 0,
35+
36+
/* ─── telemetry ─────────────────────────────────────────────── */
37+
span_id String,
38+
trace_id String,
39+
idempotency_key String,
40+
41+
/* ─── timing ─────────────────────────────────────────────── */
42+
created_at DateTime64(3),
43+
updated_at DateTime64(3),
44+
started_at Nullable(DateTime64(3)),
45+
executed_at Nullable(DateTime64(3)),
46+
completed_at Nullable(DateTime64(3)),
47+
delay_until Nullable(DateTime64(3)),
48+
queued_at Nullable(DateTime64(3)),
49+
expired_at Nullable(DateTime64(3)),
50+
expiration_ttl String,
51+
52+
/* ─── cost / usage ───────────────────────────────────────── */
53+
usage_duration_ms UInt32 DEFAULT 0,
54+
cost_in_cents Float64 DEFAULT 0,
55+
base_cost_in_cents Float64 DEFAULT 0,
56+
57+
/* ─── payload & context ──────────────────────────────────── */
58+
output JSON(max_dynamic_paths = 1024),
59+
error JSON(max_dynamic_paths = 64),
60+
61+
/* ─── tagging / versions ─────────────────────────────────── */
62+
tags Array(String) CODEC(ZSTD(1)),
63+
task_version String CODEC(LZ4),
64+
sdk_version String CODEC(LZ4),
65+
cli_version String CODEC(LZ4),
66+
machine_preset LowCardinality(String) CODEC(LZ4),
67+
68+
is_test UInt8 DEFAULT 0,
69+
70+
/* ─── commit lsn ─────────────────────────────────────────────── */
71+
_version UInt64,
72+
_is_deleted UInt8 DEFAULT 0
73+
)
74+
ENGINE = ReplacingMergeTree(_version, _is_deleted)
75+
PARTITION BY toYYYYMM(created_at)
76+
ORDER BY (organization_id, project_id, environment_id, created_at, run_id)
77+
SETTINGS enable_json_type = 1;
78+
79+
/* Fast tag filtering */
80+
ALTER TABLE trigger_dev.task_runs_v2
81+
ADD INDEX idx_tags tags TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 4;
82+
83+
CREATE VIEW trigger_dev.tmp_eric_task_runs_full_v2 AS
84+
SELECT
85+
s.*,
86+
p.payload as payload
87+
FROM trigger_dev.task_runs_v2 AS s FINAL
88+
LEFT JOIN trigger_dev.raw_task_runs_payload_v1 AS p ON s.run_id = p.run_id
89+
SETTINGS enable_json_type = 1;
90+
91+
92+
-- +goose Down
93+
DROP TABLE IF EXISTS trigger_dev.task_runs_v2;
94+
DROP VIEW IF EXISTS trigger_dev.tmp_eric_task_runs_full_v2

internal-packages/clickhouse/src/taskRuns.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { z } from "zod";
33
import { ClickhouseClient } from "./client/client.js";
44
import { insertRawTaskRunPayloads, insertTaskRuns } from "./taskRuns.js";
55

6-
describe("Task Runs V1", () => {
6+
describe("Task Runs V2", () => {
77
clickhouseTest("should be able to insert task runs", async ({ clickhouseContainer }) => {
88
const client = new ClickhouseClient({
99
name: "test",
@@ -70,7 +70,7 @@ describe("Task Runs V1", () => {
7070

7171
const query = client.query({
7272
name: "query-task-runs",
73-
query: "SELECT * FROM trigger_dev.task_runs_v1",
73+
query: "SELECT * FROM trigger_dev.task_runs_v2",
7474
schema: z.object({
7575
environment_id: z.string(),
7676
run_id: z.string(),
@@ -226,7 +226,7 @@ describe("Task Runs V1", () => {
226226

227227
const query = client.query({
228228
name: "query-task-runs",
229-
query: "SELECT * FROM trigger_dev.task_runs_v1 FINAL",
229+
query: "SELECT * FROM trigger_dev.task_runs_v2 FINAL",
230230
schema: z.object({
231231
environment_id: z.string(),
232232
run_id: z.string(),

0 commit comments

Comments
 (0)