Skip to content

Commit 3349897

Browse files
committed
Track run events in the run engine
1 parent bae987d commit 3349897

File tree

13 files changed

+131
-37
lines changed

13 files changed

+131
-37
lines changed
Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,21 @@
1-
import { singleton } from "~/utils/singleton";
21
import { ClickHouse } from "@internal/clickhouse";
2+
import { EventEmitter } from "node:events";
3+
import { prisma } from "~/db.server";
4+
import { singleton } from "~/utils/singleton";
5+
import { engine } from "~/v3/runEngine.server";
6+
import { logger } from "./logger.server";
37
import {
48
RunDashboardEventBus,
59
RunDashboardEvents,
610
RunsDashboardService,
711
} from "./runsDashboardService.server";
8-
import { EventEmitter } from "node:events";
9-
import { RuntimeEnvironmentType, TaskRun } from "@trigger.dev/database";
10-
import { engine } from "~/v3/runEngine.server";
11-
import { logger } from "./logger.server";
1212

1313
const runDashboardEventBus: RunDashboardEventBus = new EventEmitter<RunDashboardEvents>();
1414

15-
export type TaskRunStatusUpdateEnvironment = {
16-
type: RuntimeEnvironmentType;
17-
organizationId: string;
18-
};
19-
20-
export function emitRunStatusUpdate(run: TaskRun, environment: TaskRunStatusUpdateEnvironment) {
15+
export function emitRunStatusUpdate(runId: string) {
2116
runDashboardEventBus.emit("runStatusUpdate", {
22-
run,
23-
environment,
24-
organization: { id: environment.organizationId },
17+
time: new Date(),
18+
runId,
2519
});
2620
}
2721

@@ -31,16 +25,46 @@ export const runsDashboard = singleton("runsDashboard", () => {
3125
const service = new RunsDashboardService(clickhouse);
3226

3327
runDashboardEventBus.on("runStatusUpdate", async (event) => {
34-
await service.upsertRun(event.run, event.environment.type, event.organization.id);
28+
await upsertRun(event.time, event.runId, service);
3529
});
3630

3731
engine.eventBus.on("runStatusChanged", async (event) => {
38-
logger.debug("RunDashboard: runStatusChanged", {
39-
event,
40-
});
41-
42-
await service.upsertRun(event.run, event.environment.type, event.environment.organization.id);
32+
await upsertRun(event.time, event.runId, service);
4333
});
4434

4535
return service;
4636
});
37+
38+
async function upsertRun(time: Date, runId: string, service: RunsDashboardService) {
39+
const run = await prisma.taskRun.findFirst({
40+
where: {
41+
id: runId,
42+
},
43+
});
44+
45+
if (!run) {
46+
logger.error("RunDashboard: upsertRun: run not found", {
47+
runId,
48+
});
49+
50+
return;
51+
}
52+
53+
if (!run.environmentType) {
54+
logger.error("RunDashboard: upsertRun: run environment type not found", {
55+
runId,
56+
});
57+
58+
return;
59+
}
60+
61+
if (!run.organizationId) {
62+
logger.error("RunDashboard: upsertRun: run organization id not found", {
63+
runId,
64+
});
65+
66+
return;
67+
}
68+
69+
await service.upsertRun(time, run, run.environmentType, run.organizationId);
70+
}

apps/webapp/app/services/runsDashboardService.server.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export class RunsDashboardService {
1313
});
1414

1515
async upsertRun(
16+
eventTime: Date,
1617
taskRun: TaskRun,
1718
environmentType: RuntimeEnvironmentType,
1819
organizationId: string
@@ -36,7 +37,7 @@ export class RunsDashboardService {
3637
queue: taskRun.queue,
3738
schedule_id: taskRun.scheduleId ?? undefined,
3839
batch_id: taskRun.batchId ?? undefined,
39-
event_time: Date.now(),
40+
event_time: eventTime.getTime(),
4041
created_at: taskRun.createdAt.getTime(),
4142
updated_at: taskRun.updatedAt.getTime(),
4243
completed_at: taskRun.completedAt ? taskRun.completedAt.getTime() : undefined,
@@ -120,13 +121,8 @@ export class RunsDashboardService {
120121
export type RunDashboardEvents = {
121122
runStatusUpdate: [
122123
{
123-
run: TaskRun;
124-
organization: {
125-
id: string;
126-
};
127-
environment: {
128-
type: RuntimeEnvironmentType;
129-
};
124+
time: Date;
125+
runId: string;
130126
}
131127
];
132128
};

apps/webapp/app/v3/services/triggerTaskV1.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,8 @@ export class TriggerTaskServiceV1 extends BaseService {
376376
number: num,
377377
friendlyId: runFriendlyId,
378378
runtimeEnvironmentId: environment.id,
379+
environmentType: environment.type,
380+
organizationId: environment.organizationId,
379381
projectId: environment.projectId,
380382
idempotencyKey,
381383
idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined,
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- AlterTable
2+
ALTER TABLE "TaskRun" ADD COLUMN "environmentType" "RuntimeEnvironmentType",
3+
ADD COLUMN "organizationId" TEXT;
4+

internal-packages/database/prisma/schema.prisma

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1731,9 +1731,13 @@ model TaskRun {
17311731
runtimeEnvironment RuntimeEnvironment @relation(fields: [runtimeEnvironmentId], references: [id], onDelete: Cascade, onUpdate: Cascade)
17321732
runtimeEnvironmentId String
17331733
1734+
environmentType RuntimeEnvironmentType?
1735+
17341736
project Project @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade)
17351737
projectId String
17361738
1739+
organizationId String?
1740+
17371741
// The specific queue this run is in
17381742
queue String
17391743
// The queueId is set when the run is locked to a specific queue

internal-packages/run-engine/src/engine/eventBus.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ export type EventBusEvents = {
77
runStatusChanged: [
88
{
99
time: Date;
10-
run: TaskRun;
11-
environment: MinimalAuthenticatedEnvironment;
10+
runId: string;
1211
},
1312
];
1413
runAttemptStarted: [

internal-packages/run-engine/src/engine/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,8 @@ export class RunEngine {
403403
number,
404404
friendlyId,
405405
runtimeEnvironmentId: environment.id,
406+
environmentType: environment.type,
407+
organizationId: environment.organization.id,
406408
projectId: environment.project.id,
407409
idempotencyKey,
408410
idempotencyKeyExpiresAt,
@@ -550,8 +552,7 @@ export class RunEngine {
550552

551553
this.eventBus.emit("runStatusChanged", {
552554
time: new Date(),
553-
run: taskRun,
554-
environment,
555+
runId: taskRun.id,
555556
});
556557

557558
return taskRun;

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ export class CheckpointSystem {
143143
throw new ServiceValidationError("Run not found", 404);
144144
}
145145

146+
this.$.eventBus.emit("runStatusChanged", {
147+
time: new Date(),
148+
runId,
149+
});
150+
146151
// Create the checkpoint
147152
const taskRunCheckpoint = await prisma.taskRunCheckpoint.create({
148153
data: {
@@ -272,6 +277,11 @@ export class CheckpointSystem {
272277
throw new ServiceValidationError("Run not found", 404);
273278
}
274279

280+
this.$.eventBus.emit("runStatusChanged", {
281+
time: new Date(),
282+
runId,
283+
});
284+
275285
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
276286
run,
277287
snapshot: {

internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ export class DelayedRunSystem {
6868

6969
await this.$.worker.reschedule(`enqueueDelayedRun:${updatedRun.id}`, delayUntil);
7070

71+
this.$.eventBus.emit("runStatusChanged", {
72+
time: new Date(),
73+
runId: updatedRun.id,
74+
});
75+
7176
return updatedRun;
7277
});
7378
},
@@ -109,6 +114,11 @@ export class DelayedRunSystem {
109114
},
110115
});
111116

117+
this.$.eventBus.emit("runStatusChanged", {
118+
time: new Date(),
119+
runId,
120+
});
121+
112122
if (run.ttl) {
113123
const expireAt = parseNaturalLanguageDuration(run.ttl);
114124

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,11 @@ export class DequeueSystem {
356356
},
357357
});
358358

359+
this.$.eventBus.emit("runStatusChanged", {
360+
time: new Date(),
361+
runId,
362+
});
363+
359364
if (!lockedTaskRun) {
360365
this.$.logger.error(
361366
"RunEngine.dequeueFromMasterQueue(): Failed to lock task run",
@@ -573,6 +578,11 @@ export class DequeueSystem {
573578

574579
//we ack because when it's deployed it will be requeued
575580
await this.$.runQueue.acknowledgeMessage(orgId, runId);
581+
582+
this.$.eventBus.emit("runStatusChanged", {
583+
time: new Date(),
584+
runId,
585+
});
576586
});
577587
},
578588
{

0 commit comments

Comments
 (0)