Skip to content

Commit e837500

Browse files
authored
feat: v4 deadlock detection (#1970)
* Locked task runs will now require queues and tasks to be in the locked version * Client errors caught in a run function now will skip retrying * Extracted out the trigger queues logic * extract validation, idempotency keys, payloads to concerns * Extracted out a bunch of more stuff and getting trigger tests to work * Add queue and locked version tests * Deadlock detection WIP * more deadlock detection * Only detect deadlocks when the parent run is waiting on the child run * Improve the error experience around deadlocks * A couple tweaks to make CodeRabbit happy and fixing the tests in CI * Fixed failing test * Changeset * wip * Make sure to scope queries to the runtime env
1 parent 60fcf5f commit e837500

File tree

38 files changed

+3047
-552
lines changed

38 files changed

+3047
-552
lines changed

.changeset/tender-jobs-collect.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"trigger.dev": patch
3+
---
4+
5+
TriggerApiError 4xx errors will no longer cause tasks to be retried

.vscode/launch.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
{
2626
"type": "node-terminal",
2727
"request": "launch",
28-
"name": "Debug fairDequeuingStrategy.test.ts",
29-
"command": "pnpm run test -t FairDequeuingStrategy",
28+
"name": "Debug triggerTask.test.ts",
29+
"command": "pnpm run test --run ./test/engine/triggerTask.test.ts",
3030
"envFile": "${workspaceFolder}/.env",
3131
"cwd": "${workspaceFolder}/apps/webapp",
3232
"sourceMaps": true

apps/webapp/app/components/runs/v3/SpanEvents.tsx

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,13 @@ export function SpanEventError({
8383
time={spanEvent.time}
8484
titleClassName="text-rose-500"
8585
/>
86-
{enhancedException.message && <Callout variant="error">{enhancedException.message}</Callout>}
86+
{enhancedException.message && (
87+
<Callout variant="error">
88+
<pre className="text-wrap font-sans text-sm font-normal text-rose-200">
89+
{enhancedException.message}
90+
</pre>
91+
</Callout>
92+
)}
8793
{enhancedException.link &&
8894
(enhancedException.link.magic === "CONTACT_FORM" ? (
8995
<Feedback

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
import { TaskRun } from "@trigger.dev/database";
88
import { z } from "zod";
99
import { env } from "~/env.server";
10+
import { EngineServiceValidationError } from "~/runEngine/concerns/errors";
1011
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
1112
import { logger } from "~/services/logger.server";
1213
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
@@ -116,7 +117,9 @@ const { action, loader } = createActionApiRoute(
116117
);
117118
} catch (error) {
118119
if (error instanceof ServiceValidationError) {
119-
return json({ error: error.message }, { status: 422 });
120+
return json({ error: error.message }, { status: error.status ?? 422 });
121+
} else if (error instanceof EngineServiceValidationError) {
122+
return json({ error: error.message }, { status: error.status ?? 422 });
120123
} else if (error instanceof OutOfEntitlementError) {
121124
return json({ error: error.message }, { status: 422 });
122125
} else if (error instanceof Error) {

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -808,7 +808,13 @@ function RunError({ error }: { error: TaskRunError }) {
808808
return (
809809
<div className="flex flex-col gap-2 rounded-sm border border-rose-500/50 px-3 pb-3 pt-2">
810810
<Header3 className="text-rose-500">{name}</Header3>
811-
{enhancedError.message && <Callout variant="error">{enhancedError.message}</Callout>}
811+
{enhancedError.message && (
812+
<Callout variant="error">
813+
<pre className="text-wrap font-sans text-sm font-normal text-rose-200">
814+
{enhancedError.message}
815+
</pre>
816+
</Callout>
817+
)}
812818
{enhancedError.link &&
813819
(enhancedError.link.magic === "CONTACT_FORM" ? (
814820
<Feedback
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export class EngineServiceValidationError extends Error {
2+
constructor(message: string, public status?: number) {
3+
super(message);
4+
this.name = "EngineServiceValidationError";
5+
}
6+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import { RunId } from "@trigger.dev/core/v3/isomorphic";
2+
import type { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
3+
import { logger } from "~/services/logger.server";
4+
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
5+
import type { RunEngine } from "~/v3/runEngine.server";
6+
import type { TraceEventConcern, TriggerTaskRequest } from "../types";
7+
8+
export type IdempotencyKeyConcernResult =
9+
| { isCached: true; run: TaskRun }
10+
| { isCached: false; idempotencyKey?: string; idempotencyKeyExpiresAt?: Date };
11+
12+
export class IdempotencyKeyConcern {
13+
constructor(
14+
private readonly prisma: PrismaClientOrTransaction,
15+
private readonly engine: RunEngine,
16+
private readonly traceEventConcern: TraceEventConcern
17+
) {}
18+
19+
async handleTriggerRequest(request: TriggerTaskRequest): Promise<IdempotencyKeyConcernResult> {
20+
const idempotencyKey = request.options?.idempotencyKey ?? request.body.options?.idempotencyKey;
21+
const idempotencyKeyExpiresAt =
22+
request.options?.idempotencyKeyExpiresAt ??
23+
resolveIdempotencyKeyTTL(request.body.options?.idempotencyKeyTTL) ??
24+
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); // 30 days
25+
26+
if (!idempotencyKey) {
27+
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
28+
}
29+
30+
const existingRun = idempotencyKey
31+
? await this.prisma.taskRun.findFirst({
32+
where: {
33+
runtimeEnvironmentId: request.environment.id,
34+
idempotencyKey,
35+
taskIdentifier: request.taskId,
36+
},
37+
include: {
38+
associatedWaitpoint: true,
39+
},
40+
})
41+
: undefined;
42+
43+
if (existingRun) {
44+
if (existingRun.idempotencyKeyExpiresAt && existingRun.idempotencyKeyExpiresAt < new Date()) {
45+
logger.debug("[TriggerTaskService][call] Idempotency key has expired", {
46+
idempotencyKey: request.options?.idempotencyKey,
47+
run: existingRun,
48+
});
49+
50+
// Update the existing run to remove the idempotency key
51+
await this.prisma.taskRun.updateMany({
52+
where: { id: existingRun.id, idempotencyKey },
53+
data: { idempotencyKey: null, idempotencyKeyExpiresAt: null },
54+
});
55+
} else {
56+
const associatedWaitpoint = existingRun.associatedWaitpoint;
57+
const parentRunId = request.body.options?.parentRunId;
58+
const resumeParentOnCompletion = request.body.options?.resumeParentOnCompletion;
59+
//We're using `andWait` so we need to block the parent run with a waitpoint
60+
if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) {
61+
await this.traceEventConcern.traceIdempotentRun(
62+
request,
63+
{
64+
existingRun,
65+
idempotencyKey,
66+
incomplete: associatedWaitpoint.status === "PENDING",
67+
isError: associatedWaitpoint.outputIsError,
68+
},
69+
async (event) => {
70+
//block run with waitpoint
71+
await this.engine.blockRunWithWaitpoint({
72+
runId: RunId.fromFriendlyId(parentRunId),
73+
waitpoints: associatedWaitpoint.id,
74+
spanIdToComplete: event.spanId,
75+
batch: request.options?.batchId
76+
? {
77+
id: request.options.batchId,
78+
index: request.options.batchIndex ?? 0,
79+
}
80+
: undefined,
81+
projectId: request.environment.projectId,
82+
organizationId: request.environment.organizationId,
83+
tx: this.prisma,
84+
releaseConcurrency: request.body.options?.releaseConcurrency,
85+
});
86+
}
87+
);
88+
}
89+
90+
return { isCached: true, run: existingRun };
91+
}
92+
}
93+
94+
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
95+
}
96+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
2+
import { PayloadProcessor, TriggerTaskRequest } from "../types";
3+
import { env } from "~/env.server";
4+
import { startActiveSpan } from "~/v3/tracer.server";
5+
import { uploadPacketToObjectStore } from "~/v3/r2.server";
6+
import { EngineServiceValidationError } from "./errors";
7+
8+
export class DefaultPayloadProcessor implements PayloadProcessor {
9+
async process(request: TriggerTaskRequest): Promise<IOPacket> {
10+
return await startActiveSpan("handlePayloadPacket()", async (span) => {
11+
const payload = request.body.payload;
12+
const payloadType = request.body.options?.payloadType ?? "application/json";
13+
14+
const packet = this.#createPayloadPacket(payload, payloadType);
15+
16+
if (!packet.data) {
17+
return packet;
18+
}
19+
20+
const { needsOffloading, size } = packetRequiresOffloading(
21+
packet,
22+
env.TASK_PAYLOAD_OFFLOAD_THRESHOLD
23+
);
24+
25+
span.setAttribute("needsOffloading", needsOffloading);
26+
span.setAttribute("size", size);
27+
28+
if (!needsOffloading) {
29+
return packet;
30+
}
31+
32+
const filename = `${request.friendlyId}/payload.json`;
33+
34+
const [uploadError] = await tryCatch(
35+
uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment)
36+
);
37+
38+
if (uploadError) {
39+
throw new EngineServiceValidationError(
40+
"Failed to upload large payload to object store",
41+
500
42+
); // This is retryable
43+
}
44+
45+
return {
46+
data: filename,
47+
dataType: "application/store",
48+
};
49+
});
50+
}
51+
52+
#createPayloadPacket(payload: any, payloadType: string): IOPacket {
53+
if (payloadType === "application/json") {
54+
return { data: JSON.stringify(payload), dataType: "application/json" };
55+
}
56+
57+
if (typeof payload === "string") {
58+
return { data: payload, dataType: payloadType };
59+
}
60+
61+
return { dataType: payloadType };
62+
}
63+
}

0 commit comments

Comments
 (0)