Skip to content

Commit d6fd7f5

Browse files
authored
Run engine: Using root queue timestamp to prioritize completing runs (#1818)
* Improve the new run engine Trigger/Batch trigger service class names * centralize queue timestamp logic in EnqueueSystem, adding queueTimestamp support and propagation * Fixing the create tags in the new run engine trigger task service
1 parent 4fe1d49 commit d6fd7f5

File tree

16 files changed

+321
-160
lines changed

16 files changed

+321
-160
lines changed

apps/webapp/app/models/taskRunTag.server.ts

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
import { prisma } from "~/db.server";
22
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";
3+
import { PrismaClientOrTransaction } from "@trigger.dev/database";
34

45
export const MAX_TAGS_PER_RUN = 10;
56

6-
export async function createTag({ tag, projectId }: { tag: string; projectId: string }) {
7+
export async function createTag(
8+
{ tag, projectId }: { tag: string; projectId: string },
9+
prismaClient: PrismaClientOrTransaction = prisma
10+
) {
711
if (tag.trim().length === 0) return;
8-
return prisma.taskRunTag.upsert({
12+
return prismaClient.taskRunTag.upsert({
913
where: {
1014
projectId_name: {
1115
projectId: projectId,
@@ -21,6 +25,48 @@ export async function createTag({ tag, projectId }: { tag: string; projectId: st
2125
});
2226
}
2327

28+
export type TagRecord = {
29+
id: string;
30+
name: string;
31+
};
32+
33+
export async function createTags(
34+
{
35+
tags,
36+
projectId,
37+
}: {
38+
tags: string | string[] | undefined;
39+
projectId: string;
40+
},
41+
prismaClient: PrismaClientOrTransaction = prisma
42+
): Promise<TagRecord[]> {
43+
if (!tags) {
44+
return [];
45+
}
46+
47+
const tagsArray = typeof tags === "string" ? [tags] : tags;
48+
49+
if (tagsArray.length === 0) {
50+
return [];
51+
}
52+
53+
const tagRecords: TagRecord[] = [];
54+
for (const tag of tagsArray) {
55+
const tagRecord = await createTag(
56+
{
57+
tag,
58+
projectId,
59+
},
60+
prismaClient
61+
);
62+
if (tagRecord) {
63+
tagRecords.push({ id: tagRecord.id, name: tagRecord.name });
64+
}
65+
}
66+
67+
return tagRecords;
68+
}
69+
2470
export async function getTagsForRunId({
2571
friendlyId,
2672
environmentId,

apps/webapp/app/routes/api.v2.tasks.batch.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ import { logger } from "~/services/logger.server";
1010
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
1111
import { ServiceValidationError } from "~/v3/services/baseService.server";
1212
import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server";
13-
import { BatchTriggerV4Service } from "~/v3/services/batchTriggerV4.server";
1413
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
1514
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
15+
import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server";
1616

1717
const { action, loader } = createActionApiRoute(
1818
{
@@ -74,7 +74,7 @@ const { action, loader } = createActionApiRoute(
7474
? { traceparent, tracestate }
7575
: undefined;
7676

77-
const service = new BatchTriggerV4Service(batchProcessingStrategy ?? undefined);
77+
const service = new RunEngineBatchTriggerService(batchProcessingStrategy ?? undefined);
7878

7979
try {
8080
const batch = await service.call(authentication.environment, body, {

apps/webapp/app/v3/services/batchTriggerV4.server.ts renamed to apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1515
import { logger } from "~/services/logger.server";
1616
import { getEntitlement } from "~/services/platform.v3.server";
1717
import { workerQueue } from "~/services/worker.server";
18-
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../r2.server";
19-
import { startActiveSpan } from "../tracer.server";
20-
import { ServiceValidationError, WithRunEngine } from "./baseService.server";
21-
import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server";
18+
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server";
19+
import { startActiveSpan } from "../../v3/tracer.server";
20+
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
21+
import { OutOfEntitlementError, TriggerTaskService } from "../../v3/services/triggerTask.server";
2222

2323
const PROCESSING_BATCH_SIZE = 50;
2424
const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20;
@@ -49,7 +49,7 @@ export type BatchTriggerTaskServiceOptions = {
4949
/**
5050
* Larger batches, used in Run Engine v2
5151
*/
52-
export class BatchTriggerV4Service extends WithRunEngine {
52+
export class RunEngineBatchTriggerService extends WithRunEngine {
5353
private _batchProcessingStrategy: BatchProcessingStrategy;
5454

5555
constructor(
@@ -643,7 +643,7 @@ export class BatchTriggerV4Service extends WithRunEngine {
643643
}
644644

645645
async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) {
646-
await workerQueue.enqueue("v3.processBatchTaskRunV3", options, {
646+
await workerQueue.enqueue("runengine.processBatchTaskRun", options, {
647647
tx,
648648
jobKey: `BatchTriggerV3Service.process:${options.batchId}:${options.processingId}`,
649649
});

apps/webapp/app/v3/services/triggerTaskV2.server.ts renamed to apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { RunEngine, RunDuplicateIdempotencyKeyError } from "@internal/run-engine";
1+
import { RunDuplicateIdempotencyKeyError, RunEngine } from "@internal/run-engine";
22
import {
33
IOPacket,
44
packetRequiresOffloading,
@@ -14,34 +14,33 @@ import {
1414
sanitizeQueueName,
1515
stringifyDuration,
1616
} from "@trigger.dev/core/v3/isomorphic";
17-
import { Prisma, TaskRun } from "@trigger.dev/database";
17+
import { Prisma } from "@trigger.dev/database";
1818
import { env } from "~/env.server";
19-
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
19+
import { createTags, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
2020
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
2121
import { autoIncrementCounter } from "~/services/autoIncrementCounter.server";
2222
import { logger } from "~/services/logger.server";
2323
import { getEntitlement } from "~/services/platform.v3.server";
2424
import { parseDelay } from "~/utils/delays";
2525
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
2626
import { handleMetadataPacket } from "~/utils/packets";
27-
import { eventRepository } from "../eventRepository.server";
28-
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
29-
import { uploadPacketToObjectStore } from "../r2.server";
30-
import { isFinalRunStatus } from "../taskStatus";
31-
import { startActiveSpan } from "../tracer.server";
32-
import { clampMaxDuration } from "../utils/maxDuration";
33-
import { ServiceValidationError, WithRunEngine } from "./baseService.server";
27+
import { eventRepository } from "../../v3/eventRepository.server";
28+
import { findCurrentWorkerFromEnvironment } from "../../v3/models/workerDeployment.server";
29+
import { uploadPacketToObjectStore } from "../../v3/r2.server";
30+
import { getTaskEventStore } from "../../v3/taskEventStore.server";
31+
import { isFinalRunStatus } from "../../v3/taskStatus";
32+
import { startActiveSpan } from "../../v3/tracer.server";
33+
import { clampMaxDuration } from "../../v3/utils/maxDuration";
34+
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
3435
import {
3536
MAX_ATTEMPTS,
3637
OutOfEntitlementError,
3738
TriggerTaskServiceOptions,
3839
TriggerTaskServiceResult,
39-
} from "./triggerTask.server";
40-
import { WorkerGroupService } from "./worker/workerGroupService.server";
41-
import { getTaskEventStore } from "../taskEventStore.server";
40+
} from "../../v3/services/triggerTask.server";
41+
import { WorkerGroupService } from "../../v3/services/worker/workerGroupService.server";
4242

43-
/** @deprecated Use TriggerTaskService in `triggerTask.server.ts` instead. */
44-
export class TriggerTaskServiceV2 extends WithRunEngine {
43+
export class RunEngineTriggerTaskService extends WithRunEngine {
4544
public async call({
4645
taskId,
4746
environment,
@@ -299,20 +298,13 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
299298
span.setAttribute("queueName", queueName);
300299

301300
//upsert tags
302-
let tags: { id: string; name: string }[] = [];
303-
const bodyTags =
304-
typeof body.options?.tags === "string" ? [body.options.tags] : body.options?.tags;
305-
if (bodyTags && bodyTags.length > 0) {
306-
for (const tag of bodyTags) {
307-
const tagRecord = await createTag({
308-
tag,
309-
projectId: environment.projectId,
310-
});
311-
if (tagRecord) {
312-
tags.push(tagRecord);
313-
}
314-
}
315-
}
301+
const tags = await createTags(
302+
{
303+
tags: body.options?.tags,
304+
projectId: environment.projectId,
305+
},
306+
this._prisma
307+
);
316308

317309
const depth = parentRun ? parentRun.depth + 1 : 0;
318310

@@ -372,6 +364,10 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
372364
machine: body.options?.machine,
373365
priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined,
374366
releaseConcurrency: body.options?.releaseConcurrency,
367+
queueTimestamp:
368+
parentRun && body.options?.resumeParentOnCompletion
369+
? parentRun.queueTimestamp ?? undefined
370+
: undefined,
375371
},
376372
this._prisma
377373
);

apps/webapp/app/services/worker.server.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ import { reportInvocationUsage } from "./platform.v3.server";
2929
import { logger } from "./logger.server";
3030
import { BatchProcessingOptions, BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";
3131
import {
32-
BatchProcessingOptions as BatchProcessingOptionsV4,
33-
BatchTriggerV4Service,
34-
} from "~/v3/services/batchTriggerV4.server";
32+
BatchProcessingOptions as RunEngineBatchProcessingOptions,
33+
RunEngineBatchTriggerService,
34+
} from "~/runEngine/services/batchTrigger.server";
3535

3636
const workerCatalog = {
3737
scheduleEmail: DeliverEmailSchema,
@@ -99,7 +99,7 @@ const workerCatalog = {
9999
}),
100100
"v3.cancelDevSessionRuns": CancelDevSessionRunsServiceOptions,
101101
"v3.processBatchTaskRun": BatchProcessingOptions,
102-
"v3.processBatchTaskRunV3": BatchProcessingOptionsV4,
102+
"runengine.processBatchTaskRun": RunEngineBatchProcessingOptions,
103103
};
104104

105105
let workerQueue: ZodWorker<typeof workerCatalog>;
@@ -341,11 +341,11 @@ function getWorkerQueue() {
341341
await service.processBatchTaskRun(payload);
342342
},
343343
},
344-
"v3.processBatchTaskRunV3": {
344+
"runengine.processBatchTaskRun": {
345345
priority: 0,
346346
maxAttempts: 5,
347347
handler: async (payload, job) => {
348-
const service = new BatchTriggerV4Service(payload.strategy);
348+
const service = new RunEngineBatchTriggerService(payload.strategy);
349349

350350
await service.processBatchTaskRun(payload);
351351
},

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
44
import { determineEngineVersion } from "../engineVersion.server";
55
import { WithRunEngine } from "./baseService.server";
66
import { TriggerTaskServiceV1 } from "./triggerTaskV1.server";
7-
import { TriggerTaskServiceV2 } from "./triggerTaskV2.server";
7+
import { RunEngineTriggerTaskService } from "~/runEngine/services/triggerTask.server";
88

99
export type TriggerTaskServiceOptions = {
1010
idempotencyKey?: string;
@@ -78,7 +78,7 @@ export class TriggerTaskService extends WithRunEngine {
7878
body: TriggerTaskRequestBody,
7979
options: TriggerTaskServiceOptions = {}
8080
): Promise<TriggerTaskServiceResult | undefined> {
81-
const service = new TriggerTaskServiceV2({
81+
const service = new RunEngineTriggerTaskService({
8282
prisma: this._prisma,
8383
engine: this._engine,
8484
});

internal-packages/run-engine/src/engine/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ export class RunEngine {
334334
maxAttempts,
335335
taskEventStore,
336336
priorityMs,
337+
queueTimestamp,
337338
ttl,
338339
tags,
339340
parentTaskRunId,
@@ -414,6 +415,7 @@ export class RunEngine {
414415
maxAttempts,
415416
taskEventStore,
416417
priorityMs,
418+
queueTimestamp: queueTimestamp ?? delayUntil ?? new Date(),
417419
ttl,
418420
tags:
419421
tags.length === 0
@@ -520,7 +522,6 @@ export class RunEngine {
520522
await this.enqueueSystem.enqueueRun({
521523
run: taskRun,
522524
env: environment,
523-
timestamp: Date.now() - taskRun.priorityMs,
524525
workerId,
525526
runnerId,
526527
tx: prisma,

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ export class CheckpointSystem {
161161
const newSnapshot = await this.enqueueSystem.enqueueRun({
162162
run,
163163
env: run.runtimeEnvironment,
164-
timestamp: run.createdAt.getTime() - run.priorityMs,
165164
snapshot: {
166165
status: "QUEUED",
167166
description:

internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ export class DelayedRunSystem {
9898
await this.enqueueSystem.enqueueRun({
9999
run,
100100
env: run.runtimeEnvironment,
101-
timestamp: run.createdAt.getTime() - run.priorityMs,
102101
batchId: run.batchId ?? undefined,
103102
});
104103

internal-packages/run-engine/src/engine/systems/enqueueSystem.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ export class EnqueueSystem {
2525
public async enqueueRun({
2626
run,
2727
env,
28-
timestamp,
2928
tx,
3029
snapshot,
3130
previousSnapshotId,
@@ -37,7 +36,6 @@ export class EnqueueSystem {
3736
}: {
3837
run: TaskRun;
3938
env: MinimalAuthenticatedEnvironment;
40-
timestamp: number;
4139
tx?: PrismaClientOrTransaction;
4240
snapshot?: {
4341
status?: Extract<TaskRunExecutionStatus, "QUEUED" | "QUEUED_EXECUTING">;
@@ -81,6 +79,8 @@ export class EnqueueSystem {
8179
masterQueues.push(run.secondaryMasterQueue);
8280
}
8381

82+
const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs;
83+
8484
await this.$.runQueue.enqueueMessage({
8585
env,
8686
masterQueues,

internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,6 @@ export class PendingVersionSystem {
9797
await this.enqueueSystem.enqueueRun({
9898
run: updatedRun,
9999
env: backgroundWorker.runtimeEnvironment,
100-
//add to the queue using the original run created time
101-
//this should ensure they're in the correct order in the queue
102-
timestamp: updatedRun.createdAt.getTime() - updatedRun.priorityMs,
103100
tx,
104101
});
105102
});

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,6 @@ export class WaitpointSystem {
540540
await this.enqueueSystem.enqueueRun({
541541
run,
542542
env: run.runtimeEnvironment,
543-
timestamp: run.createdAt.getTime() - run.priorityMs,
544543
snapshot: {
545544
status: "QUEUED_EXECUTING",
546545
description: "Run can continue, but is waiting for concurrency",
@@ -564,7 +563,6 @@ export class WaitpointSystem {
564563
await this.enqueueSystem.enqueueRun({
565564
run,
566565
env: run.runtimeEnvironment,
567-
timestamp: run.createdAt.getTime() - run.priorityMs,
568566
snapshot: {
569567
description: "Run was QUEUED, because all waitpoints are completed",
570568
},

0 commit comments

Comments
 (0)